You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2015/10/25 17:11:08 UTC
[1/3] storm git commit: STORM-1073: Refactor AbstractHdfsBolt
Repository: storm
Updated Branches:
refs/heads/master 3bc375e73 -> b9badc298
STORM-1073: Refactor AbstractHdfsBolt
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f2857eb3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f2857eb3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f2857eb3
Branch: refs/heads/master
Commit: f2857eb381961dd57819d741788d47d07412c74d
Parents: ae4b553
Author: Aaron Dossett <aa...@target.com>
Authored: Tue Oct 20 15:19:22 2015 -0500
Committer: Aaron Dossett <aa...@target.com>
Committed: Tue Oct 20 15:19:22 2015 -0500
----------------------------------------------------------------------
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 121 +++++++++++++++++++
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 114 +----------------
.../storm/hdfs/bolt/SequenceFileBolt.java | 42 +++----
3 files changed, 142 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/f2857eb3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
index f260598..0bfccf4 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/AbstractHdfsBolt.java
@@ -17,10 +17,14 @@
*/
package org.apache.storm.hdfs.bolt;
+import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
+import backtype.storm.utils.Utils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -35,12 +39,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.Map;
+import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
public abstract class AbstractHdfsBolt extends BaseRichBolt {
private static final Logger LOG = LoggerFactory.getLogger(AbstractHdfsBolt.class);
+ private static final Integer DEFAULT_RETRY_COUNT = 3;
protected ArrayList<RotationAction> rotationActions = new ArrayList<RotationAction>();
private Path currentFile;
@@ -54,6 +61,10 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
protected String configKey;
protected transient Object writeLock;
protected transient Timer rotationTimer; // only used for TimedRotationPolicy
+ private List<Tuple> tupleBatch = new LinkedList<>();
+ protected long offset = 0;
+ protected Integer fileRetryCount = DEFAULT_RETRY_COUNT;
+ protected Integer tickTupleInterval = 0;
protected transient Configuration hdfsConfig;
@@ -99,6 +110,13 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
}
}
+ // If interval is non-zero then it has already been explicitly set and we should not default it
+ if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0)
+ {
+ Integer topologyTimeout = Utils.getInt(conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS));
+ tickTupleInterval = (int)(Math.floor(topologyTimeout / 2));
+ LOG.debug("Setting tick tuple interval to [{}] based on topology timeout", tickTupleInterval);
+ }
try{
HdfsSecurityUtil.login(conf, hdfsConfig);
@@ -127,9 +145,112 @@ public abstract class AbstractHdfsBolt extends BaseRichBolt {
}
@Override
+ public final void execute(Tuple tuple) {
+
+ synchronized (this.writeLock) {
+ boolean forceSync = false;
+ if (TupleUtils.isTick(tuple)) {
+ LOG.debug("TICK! forcing a file system flush");
+ forceSync = true;
+ } else {
+ try {
+ writeTuple(tuple);
+ tupleBatch.add(tuple);
+ } catch (IOException e) {
+ //If the write failed, try to sync anything already written
+ LOG.info("Tuple failed to write, forcing a flush of existing data.");
+ this.collector.reportError(e);
+ forceSync = true;
+ this.collector.fail(tuple);
+ }
+ }
+
+ if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
+ int attempts = 0;
+ boolean success = false;
+ IOException lastException = null;
+ // Make every attempt to sync the data we have. If it can't be done then kill the bolt with
+ // a runtime exception. The filesystem is presumably in a very bad state.
+ while (success == false && attempts < fileRetryCount) {
+ attempts += 1;
+ try {
+ syncTuples();
+ LOG.debug("Data synced to filesystem. Ack'ing [{}] tuples", tupleBatch.size());
+ for (Tuple t : tupleBatch) {
+ this.collector.ack(t);
+ }
+ tupleBatch.clear();
+ syncPolicy.reset();
+ success = true;
+ } catch (IOException e) {
+ LOG.warn("Data could not be synced to filesystem on attempt [{}]", attempts);
+ this.collector.reportError(e);
+ lastException = e;
+ }
+ }
+
+ // If unsuccesful fail the pending tuples
+ if (success == false) {
+ LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
+ for (Tuple t : tupleBatch) {
+ this.collector.fail(t);
+ }
+ tupleBatch.clear();
+
+ throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException);
+ }
+ }
+
+ if(this.rotationPolicy.mark(tuple, this.offset)) {
+ try {
+ rotateOutputFile();
+ this.rotationPolicy.reset();
+ this.offset = 0;
+ } catch (IOException e) {
+ this.collector.reportError(e);
+ LOG.warn("File could not be rotated");
+ //At this point there is nothing to do. In all likelihood any filesystem operations will fail.
+ //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That
+ //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
+ }
+ }
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ Map<String, Object> conf = super.getComponentConfiguration();
+ if (conf == null)
+ conf = new Config();
+
+ if (tickTupleInterval > 0) {
+ LOG.info("Enabling tick tuple with interval [{}]", tickTupleInterval);
+ conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickTupleInterval);
+ }
+
+ return conf;
+ }
+
+ @Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
}
+ /**
+ * writes a tuple to the underlying filesystem but makes no guarantees about syncing data
+ * @param tuple
+ * @throws IOException
+ */
+ abstract void writeTuple(Tuple tuple) throws IOException;
+
+ /**
+ * Make the best effort to sync written data to the underlying file system. Concrete classes should very clearly
+ * state the file state that sync guarantees. For example, HdfsBolt can make a much stronger guarantee than
+ * SequenceFileBolt.
+ *
+ * @throws IOException
+ */
+ abstract void syncTuples() throws IOException;
+
abstract void closeOutputFile() throws IOException;
abstract Path createOutputFile() throws IOException;
http://git-wip-us.apache.org/repos/asf/storm/blob/f2857eb3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
index 557afd6..101aa57 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/HdfsBolt.java
@@ -17,11 +17,9 @@
*/
package org.apache.storm.hdfs.bolt;
-import backtype.storm.Config;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TupleUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -39,19 +37,12 @@ import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
import java.util.Map;
-import java.util.List;
-import java.util.LinkedList;
public class HdfsBolt extends AbstractHdfsBolt{
private static final Logger LOG = LoggerFactory.getLogger(HdfsBolt.class);
- private static final Integer DEFAULT_RETRY_COUNT = 3;
private transient FSDataOutputStream out;
private RecordFormat format;
- private long offset = 0;
- private List<Tuple> tupleBatch = new LinkedList<>();
- Integer tickTupleInterval = 0;
- Integer fileRetryCount = DEFAULT_RETRY_COUNT;
public HdfsBolt withFsUrl(String fsUrl){
this.fsUrl = fsUrl;
@@ -102,122 +93,23 @@ public class HdfsBolt extends AbstractHdfsBolt{
public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
LOG.info("Preparing HDFS Bolt...");
this.fs = FileSystem.get(URI.create(this.fsUrl), hdfsConfig);
-
- // If interval is non-zero then it has already been explicitly set and we should not default it
- if (conf.containsKey("topology.message.timeout.secs") && tickTupleInterval == 0)
- {
- Integer topologyTimeout = Integer.parseInt(conf.get("topology.message.timeout.secs").toString());
- tickTupleInterval = (int)(Math.floor(topologyTimeout / 2));
- LOG.debug("Setting tick tuple interval to [" + tickTupleInterval + "] based on topology timeout");
- }
}
@Override
- public void execute(Tuple tuple) {
-
- synchronized (this.writeLock) {
- boolean forceSync = false;
- if (TupleUtils.isTick(tuple)) {
- LOG.debug("TICK! forcing a file system flush");
- forceSync = true;
- }
- else {
- try {
- writeAndAddTuple(tuple);
- } catch (IOException e) {
- //If the write failed, try to sync anything already written
- LOG.info("Tuple failed to write, forcing a flush of existing data.");
- this.collector.reportError(e);
- forceSync = true;
- this.collector.fail(tuple);
- }
- }
-
- if (this.syncPolicy.mark(tuple, this.offset) || (forceSync && tupleBatch.size() > 0)) {
- int attempts = 0;
- boolean success = false;
- IOException lastException = null;
- // Make every attempt to sync the data we have. If it can't be done then kill the bolt with
- // a runtime exception. The filesystem is presumably in a very bad state.
- while (success == false && attempts < fileRetryCount)
- {
- attempts += 1;
- try {
- syncAndAckTuples();
- success = true;
- } catch (IOException e) {
- LOG.warn("Data could not be synced to filesystem on attempt [" + attempts + "]");
- this.collector.reportError(e);
- lastException = e;
- }
- }
-
- // If unsuccesful fail the pending tuples
- if (success == false)
- {
- LOG.warn("Data could not be synced to filesystem, failing this batch of tuples");
- for (Tuple t : tupleBatch)
- this.collector.fail(t);
- tupleBatch.clear();
-
- throw new RuntimeException("Sync failed [" + attempts + "] times.", lastException);
- }
- }
-
- }
-
- if(this.rotationPolicy.mark(tuple, this.offset)) {
- try {
- rotateAndReset();
- } catch (IOException e) {
- this.collector.reportError(e);
- LOG.warn("File could not be rotated");
- //At this point there is nothing to do. In all likelihood any filesystem operations will fail.
- //The next tuple will almost certainly fail to write and/or sync, which force a rotation. That
- //will give rotateAndReset() a chance to work which includes creating a fresh file handle.
- }
- }
- }
-
- private void rotateAndReset() throws IOException {
- rotateOutputFile(); // synchronized
- this.offset = 0;
- this.rotationPolicy.reset();
- }
-
- private void syncAndAckTuples() throws IOException {
+ void syncTuples() throws IOException {
LOG.debug("Attempting to sync all data to filesystem");
if (this.out instanceof HdfsDataOutputStream) {
((HdfsDataOutputStream) this.out).hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH));
} else {
this.out.hsync();
}
- this.syncPolicy.reset();
- LOG.debug("Data synced to filesystem. Ack'ing [" + tupleBatch.size() +"] tuples");
- for (Tuple t : tupleBatch)
- this.collector.ack(t);
- tupleBatch.clear();
}
- private void writeAndAddTuple(Tuple tuple) throws IOException {
+ @Override
+ void writeTuple(Tuple tuple) throws IOException {
byte[] bytes = this.format.format(tuple);
out.write(bytes);
this.offset += bytes.length;
- tupleBatch.add(tuple);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> conf = super.getComponentConfiguration();
- if (conf == null)
- conf = new Config();
-
- if (tickTupleInterval > 0) {
- LOG.info("Enabling tick tuple with interval [" + tickTupleInterval + "]");
- conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, tickTupleInterval);
- }
-
- return conf;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/f2857eb3/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
index baf4df0..fcd0d29 100644
--- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
+++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/bolt/SequenceFileBolt.java
@@ -89,11 +89,21 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
return this;
}
+ public SequenceFileBolt withTickTupleIntervalSeconds(int interval) {
+ this.tickTupleInterval = interval;
+ return this;
+ }
+
public SequenceFileBolt addRotationAction(RotationAction action){
this.rotationActions.add(action);
return this;
}
+ public SequenceFileBolt withRetryCount(int fileRetryCount) {
+ this.fileRetryCount = fileRetryCount;
+ return this;
+ }
+
@Override
public void doPrepare(Map conf, TopologyContext topologyContext, OutputCollector collector) throws IOException {
LOG.info("Preparing Sequence File Bolt...");
@@ -104,29 +114,15 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
}
@Override
- public void execute(Tuple tuple) {
- try {
- long offset;
- synchronized (this.writeLock) {
- this.writer.append(this.format.key(tuple), this.format.value(tuple));
- offset = this.writer.getLength();
-
- if (this.syncPolicy.mark(tuple, offset)) {
- this.writer.hsync();
- this.syncPolicy.reset();
- }
- }
-
- this.collector.ack(tuple);
- if (this.rotationPolicy.mark(tuple, offset)) {
- rotateOutputFile(); // synchronized
- this.rotationPolicy.reset();
- }
- } catch (IOException e) {
- this.collector.reportError(e);
- this.collector.fail(tuple);
- }
+ void syncTuples() throws IOException {
+ LOG.debug("Attempting to sync all data to filesystem");
+ this.writer.hsync();
+ }
+ @Override
+ void writeTuple(Tuple tuple) throws IOException {
+ this.writer.append(this.format.key(tuple), this.format.value(tuple));
+ this.offset = this.writer.getLength();
}
Path createOutputFile() throws IOException {
@@ -144,6 +140,4 @@ public class SequenceFileBolt extends AbstractHdfsBolt {
void closeOutputFile() throws IOException {
this.writer.close();
}
-
-
}
[2/3] storm git commit: Merge branch 'HdfsBolt' of
https://github.com/dossett/storm into STORM-1073
Posted by sr...@apache.org.
Merge branch 'HdfsBolt' of https://github.com/dossett/storm into STORM-1073
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e5ad9565
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e5ad9565
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e5ad9565
Branch: refs/heads/master
Commit: e5ad95655e2aac88647c147591d8d11349089da7
Parents: 3bc375e f2857eb
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Oct 25 08:58:44 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Oct 25 08:58:44 2015 -0700
----------------------------------------------------------------------
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 121 +++++++++++++++++++
.../org/apache/storm/hdfs/bolt/HdfsBolt.java | 114 +----------------
.../storm/hdfs/bolt/SequenceFileBolt.java | 42 +++----
3 files changed, 142 insertions(+), 135 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: Added STORM-1073 to CHANGELOG.
Posted by sr...@apache.org.
Added STORM-1073 to CHANGELOG.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b9badc29
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b9badc29
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b9badc29
Branch: refs/heads/master
Commit: b9badc29862a4f1744d8994a7d39cd10a938a612
Parents: e5ad956
Author: Sriharsha Chintalapani <ha...@hortonworks.com>
Authored: Sun Oct 25 09:01:35 2015 -0700
Committer: Sriharsha Chintalapani <ha...@hortonworks.com>
Committed: Sun Oct 25 09:01:35 2015 -0700
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/b9badc29/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f069844..189897d 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.11.0
+ * STORM-1073: Refactor AbstractHdfsBolt
* STORM-1128: Make metrics fast
* STORM-1122: Fix the format issue in Utils.java
* STORM-1111: Fix Validation for lots of different configs