You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/25 17:11:08 UTC

[1/3] storm git commit: STORM-1073: Refactor AbstractHdfsBolt

Repository: storm
Updated Branches:
  refs/heads/master 3bc375e73 -> b9badc298


STORM-1073: Refactor AbstractHdfsBolt


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f2857eb3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f2857eb3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f2857eb3

Branch: refs/heads/master
Commit: f2857eb381961dd57819d741788d47d07412c74d
Parents: ae4b553
Author: Aaron Dossett <aa...@target.com>
Authored: Tue Oct 20 15:19:22 2015 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Tue Oct 20 15:19:22 2015 -0500

----------------------------------------------------------------------
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       | 121 +++++++++++++++++++
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    | 114 +----------------
 .../storm/hdfs/bolt/SequenceFileBolt.java       |  42 +++----
 3 files changed, 142 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/f2857eb3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index f260598..0bfccf4 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -17,10 +17,14 @@
  */
 package org.apache.storm.hdfs.bolt;
 
+import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import backtype.storm.utils.Utils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -35,12 +39,15 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.Map;
+import java.util.List;
 import java.util.Timer;
 import java.util.TimerTask;
 
 public abstract class AbstractHdfsBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
+    private static final Integer DEFAULT_RETRY_COUNT = 3;
 
     protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
     private Path currentFile;
@@ -54,6 +61,10 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
     protected String configKey;
     protected transient Object writeLock;
     protected transient Timer rotationTimer; // only used for TimedRotationPolicy
+    private List<Tuple> tupleBatch = new LinkedList<>();
+    protected long offset = 0;
+    protected Integer fileRetryCount = DEFAULT_RETRY_COUNT;
+    protected Integer tickTupleInterval = 0;
 
     protected transient Configuration hdfsConfig;
 
@@ -99,6 +110,13 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
             }
         }
 
+        // If interval is non-zero then it has already been explicitly set and we should not default it
+        if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0)
+        {
+            Integer topologyTimeout = Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+            tickTupleInterval = (int)(Math.floor(topologyTimeout / 2));
+            LOG.debug("Setting tick tuple interval to [{}] based on topology timeout", tickTupleInterval);
+        }
 
         try{
             HdfsSecurityUtil.login(conf, hdfsConfig);
@@ -127,9 +145,112 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
     }
 
     @Override
+    public final void execute(Tuple tuple) {
+
+        synchronized (this.writeLock) {
+            boolean forceSync = false;
+            if (TupleUtils.isTick(tuple)) {
+                LOG.debug("TICK! forcing a file system flush");
+                forceSync = true;
+            } else {
+                try {
+                    writeTuple(tuple);
+                    tupleBatch.add(tuple);
+                } catch (IOException e) {
+                    //If the write failed, try to sync anything already written
+                    LOG.info("Tuple failed to write, forcing a flush of existing data.");
+                    this.collector.reportError(e);
+                    forceSync = true;
+                    this.collector.fail(tuple);
+                }
+            }
+
+            if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
+                int attempts = 0;
+                boolean success = false;
+                IOException lastException = null;
+                // Make every attempt to sync the data we have.  If it can't be done then kill the bolt with
+                // a runtime exception.  The filesystem is presumably in a very bad state.
+                while (success == false && attempts < fileRetryCount) {
+                    attempts += 1;
+                    try {
+                        syncTuples();
+                        LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
+                        for (Tuple t : tupleBatch) {
+                            this.collector.ack(t);
+                        }
+                        tupleBatch.clear();
+                        syncPolicy.reset();
+                        success = true;
+                    } catch (IOException e) {
+                        LOG.warn("Data could not be synced to filesystem on attempt [{}]", attempts);
+                        this.collector.reportError(e);
+                        lastException = e;
+                    }
+                }
+
+                // If unsuccesful fail the pending tuples
+                if (success == false) {
+                    LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
+                    for (Tuple t : tupleBatch) {
+                        this.collector.fail(t);
+                    }
+                    tupleBatch.clear();
+
+                    throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException);
+                }
+            }
+
+            if(this.rotationPolicy.mark(tuple, this.offset)) {
+                try {
+                    rotateOutputFile();
+                    this.rotationPolicy.reset();
+                    this.offset = 0;
+                } catch (IOException e) {
+                    this.collector.reportError(e);
+                    LOG.warn("File could not be rotated");
+                    //At this point there is nothing to do.  In all likelihood any filesystem operations will fail.
+                    //The next tuple will almost certainly fail to write and/or sync, which force a rotation.  That
+                    //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
+                }
+            }
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        Map<String, Object> conf = super.getComponentConfiguration();
+        if (conf == null)
+            conf = new Config();
+
+        if (tickTupleInterval > 0) {
+            LOG.info("Enabling tick tuple with interval [{}]", tickTupleInterval);
+            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickTupleInterval);
+        }
+
+        return conf;
+    }
+
+    @Override
     public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
     }
 
+    /**
+     * writes a tuple to the underlying filesystem but makes no guarantees about syncing data
+     * @param tuple
+     * @throws IOException
+     */
+    abstract void writeTuple(Tuple tuple) throws IOException;
+
+    /**
+     * Make the best effort to sync written data to the underlying file system.  Concrete classes should very clearly
+     * state the file state that sync guarantees.  For example, HdfsBolt can make a much stronger guarantee than
+     * SequenceFileBolt.
+     *
+     * @throws IOException
+     */
+    abstract void syncTuples() throws IOException;
+
     abstract void closeOutputFile() throws IOException;
 
     abstract Path createOutputFile() throws IOException;

http://git-wip-us.apache.org/repos/asf/storm/blob/f2857eb3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 557afd6..101aa57 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -17,11 +17,9 @@
  */
 package org.apache.storm.hdfs.bolt;
 
-import backtype.storm.Config;
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TupleUtils;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -39,19 +37,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.Map;
-import java.util.List;
-import java.util.LinkedList;
 
 public class HdfsBolt extends AbstractHdfsBolt{
     private static final Logger LOG = LoggerFactory.getLogger(HdfsBolt.class);
-    private static final Integer DEFAULT_RETRY_COUNT = 3;
 
     private transient FSDataOutputStream out;
     private RecordFormat format;
-    private long offset = 0;
-    private List<Tuple> tupleBatch = new LinkedList<>();
-    Integer tickTupleInterval = 0;
-    Integer fileRetryCount = DEFAULT_RETRY_COUNT;
 
     public HdfsBolt withFsUrl(String fsUrl){
         this.fsUrl = fsUrl;
@@ -102,122 +93,23 @@ public class HdfsBolt extends AbstractHdfsBolt{
     public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing HDFS Bolt...");
         this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
-
-        // If interval is non-zero then it has already been explicitly set and we should not default it
-        if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0)
-        {
-            Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
-            tickTupleInterval = (int)(Math.floor(topologyTimeout / 2));
-            LOG.debug("Setting tick tuple interval to [" + tickTupleInterval + "] based on topology timeout");
-        }
     }
 
     @Override
-    public void execute(Tuple tuple) {
-
-        synchronized (this.writeLock) {
-            boolean forceSync = false;
-            if (TupleUtils.isTick(tuple)) {
-                LOG.debug("TICK! forcing a file system flush");
-                forceSync = true;
-            }
-            else {
-                try {
-                    writeAndAddTuple(tuple);
-                } catch (IOException e) {
-                    //If the write failed, try to sync anything already written
-                    LOG.info("Tuple failed to write, forcing a flush of existing data.");
-                    this.collector.reportError(e);
-                    forceSync = true;
-                    this.collector.fail(tuple);
-                }
-            }
-
-            if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
-                int attempts = 0;
-                boolean success = false;
-                IOException lastException = null;
-                // Make every attempt to sync the data we have.  If it can't be done then kill the bolt with
-                // a runtime exception.  The filesystem is presumably in a very bad state.
-                while (success == false && attempts < fileRetryCount)
-                {
-                    attempts += 1;
-                    try {
-                        syncAndAckTuples();
-                        success = true;
-                    } catch (IOException e) {
-                        LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "]");
-                        this.collector.reportError(e);
-                        lastException = e;
-                    }
-                }
-
-                // If unsuccesful fail the pending tuples
-                if (success == false)
-                {
-                    LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
-                    for (Tuple t : tupleBatch)
-                        this.collector.fail(t);
-                    tupleBatch.clear();
-
-                    throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException);
-                }
-            }
-
-        }
-
-        if(this.rotationPolicy.mark(tuple, this.offset)) {
-            try {
-                rotateAndReset();
-            } catch (IOException e) {
-                this.collector.reportError(e);
-                LOG.warn("File could not be rotated");
-                //At this point there is nothing to do.  In all likelihood any filesystem operations will fail.
-                //The next tuple will almost certainly fail to write and/or sync, which force a rotation.  That
-                //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
-            }
-        }
-    }
-
-    private void rotateAndReset() throws IOException {
-        rotateOutputFile(); // synchronized
-        this.offset = 0;
-        this.rotationPolicy.reset();
-    }
-
-    private void syncAndAckTuples() throws IOException {
+    void syncTuples() throws IOException {
         LOG.debug("Attempting to sync all data to filesystem");
         if (this.out instanceof HdfsDataOutputStream) {
             ((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
         } else {
             this.out.hsync();
         }
-        this.syncPolicy.reset();
-        LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() +"] tuples");
-        for (Tuple t : tupleBatch)
-            this.collector.ack(t);
-        tupleBatch.clear();
     }
 
-    private void writeAndAddTuple(Tuple tuple) throws IOException {
+    @Override
+    void writeTuple(Tuple tuple) throws IOException {
         byte[] bytes = this.format.format(tuple);
         out.write(bytes);
         this.offset += bytes.length;
-        tupleBatch.add(tuple);
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        Map<String, Object> conf = super.getComponentConfiguration();
-        if (conf == null)
-            conf = new Config();
-
-        if (tickTupleInterval > 0) {
-            LOG.info("Enabling tick tuple with interval [" + tickTupleInterval + "]");
-            conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickTupleInterval);
-        }
-
-        return conf;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/f2857eb3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index baf4df0..fcd0d29 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -89,11 +89,21 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
         return this;
     }
 
+    public SequenceFileBolt withTickTupleIntervalSeconds(int interval) {
+        this.tickTupleInterval = interval;
+        return this;
+    }
+
     public SequenceFileBolt addRotationAction(RotationAction action){
         this.rotationActions.add(action);
         return this;
     }
 
+    public SequenceFileBolt withRetryCount(int fileRetryCount) {
+        this.fileRetryCount = fileRetryCount;
+        return this;
+    }
+
     @Override
     public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
         LOG.info("Preparing Sequence File Bolt...");
@@ -104,29 +114,15 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
     }
 
     @Override
-    public void execute(Tuple tuple) {
-        try {
-            long offset;
-            synchronized (this.writeLock) {
-                this.writer.append(this.format.key(tuple), this.format.value(tuple));
-                offset = this.writer.getLength();
-
-                if (this.syncPolicy.mark(tuple, offset)) {
-                    this.writer.hsync();
-                    this.syncPolicy.reset();
-                }
-            }
-
-            this.collector.ack(tuple);
-            if (this.rotationPolicy.mark(tuple, offset)) {
-                rotateOutputFile(); // synchronized
-                this.rotationPolicy.reset();
-            }
-        } catch (IOException e) {
-            this.collector.reportError(e);
-            this.collector.fail(tuple);
-        }
+    void syncTuples() throws IOException {
+        LOG.debug("Attempting to sync all data to filesystem");
+        this.writer.hsync();
+    }
 
+    @Override
+    void writeTuple(Tuple tuple) throws IOException {
+        this.writer.append(this.format.key(tuple), this.format.value(tuple));
+        this.offset = this.writer.getLength();
     }
 
     Path createOutputFile() throws IOException {
@@ -144,6 +140,4 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
     void closeOutputFile() throws IOException {
         this.writer.close();
     }
-
-
 }


[2/3] storm git commit: Merge branch 'HdfsBolt' of https://github.com/dossett/storm into STORM-1073

Posted by sr...@apache.org.
Merge branch 'HdfsBolt' of https://github.com/dossett/storm into STORM-1073


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5ad9565
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5ad9565
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5ad9565

Branch: refs/heads/master
Commit: e5ad95655e2aac88647c147591d8d11349089da7
Parents: 3bc375e f2857eb
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Oct 25 08:58:44 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Oct 25 08:58:44 2015 -0700

----------------------------------------------------------------------
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       | 121 +++++++++++++++++++
 .../org/apache/storm/hdfs/bolt/HdfsBolt.java    | 114 +----------------
 .../storm/hdfs/bolt/SequenceFileBolt.java       |  42 +++----
 3 files changed, 142 insertions(+), 135 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: Added STORM-1073 to CHANGELOG.

Posted by sr...@apache.org.
Added STORM-1073 to CHANGELOG.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9badc29
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9badc29
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9badc29

Branch: refs/heads/master
Commit: b9badc29862a4f1744d8994a7d39cd10a938a612
Parents: e5ad956
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Oct 25 09:01:35 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Oct 25 09:01:35 2015 -0700

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b9badc29/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f069844..189897d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.11.0
+ * STORM-1073: Refactor AbstractHdfsBolt
  * STORM-1128: Make metrics fast
  * STORM-1122: Fix the format issue in Utils.java
  * STORM-1111: Fix Validation for lots of different configs