You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/12/01 23:05:07 UTC
[29/51] [partial] storm git commit: Update JStorm to latest release
2.1.0
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
index 1570aeb..aaf6875 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestEventLogSpout.java
@@ -36,103 +36,103 @@ import backtype.storm.tuple.Values;
public class TestEventLogSpout extends BaseRichSpout {
public static Logger LOG = LoggerFactory.getLogger(TestEventLogSpout.class);
-
+
private static final Map<String, Integer> acked = new HashMap<String, Integer>();
private static final Map<String, Integer> failed = new HashMap<String, Integer>();
-
+
private String uid;
private long totalCount;
-
+
SpoutOutputCollector _collector;
private long eventId = 0;
private long myCount;
private int source;
-
+
public static int getNumAcked(String stormId) {
- synchronized(acked) {
+ synchronized (acked) {
return get(acked, stormId, 0);
}
}
public static int getNumFailed(String stormId) {
- synchronized(failed) {
+ synchronized (failed) {
return get(failed, stormId, 0);
}
}
-
+
public TestEventLogSpout(long totalCount) {
this.uid = UUID.randomUUID().toString();
-
- synchronized(acked) {
+
+ synchronized (acked) {
acked.put(uid, 0);
}
- synchronized(failed) {
+ synchronized (failed) {
failed.put(uid, 0);
}
-
+
this.totalCount = totalCount;
}
-
+
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
this.source = context.getThisTaskId();
long taskCount = context.getComponentTasks(context.getThisComponentId()).size();
myCount = totalCount / taskCount;
}
-
+
public void close() {
-
+
}
-
+
public void cleanup() {
- synchronized(acked) {
+ synchronized (acked) {
acked.remove(uid);
- }
- synchronized(failed) {
+ }
+ synchronized (failed) {
failed.remove(uid);
}
}
-
+
public boolean completed() {
-
+
int ackedAmt;
int failedAmt;
-
- synchronized(acked) {
+
+ synchronized (acked) {
ackedAmt = acked.get(uid);
}
- synchronized(failed) {
+ synchronized (failed) {
failedAmt = failed.get(uid);
}
int totalEmitted = ackedAmt + failedAmt;
-
+
if (totalEmitted >= totalCount) {
return true;
}
return false;
}
-
+
public void nextTuple() {
- if (eventId < myCount) {
+ if (eventId < myCount) {
eventId++;
_collector.emit(new Values(source, eventId), eventId);
- }
+ }
}
-
+
public void ack(Object msgId) {
- synchronized(acked) {
+ synchronized (acked) {
int curr = get(acked, uid, 0);
- acked.put(uid, curr+1);
+ acked.put(uid, curr + 1);
}
}
public void fail(Object msgId) {
- synchronized(failed) {
+ synchronized (failed) {
int curr = get(failed, uid, 0);
- failed.put(uid, curr+1);
+ failed.put(uid, curr + 1);
}
}
-
+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("source", "eventId"));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
index 1f80362..8286d0b 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestEventOrderCheckBolt.java
@@ -36,7 +36,7 @@ import backtype.storm.tuple.Values;
public class TestEventOrderCheckBolt extends BaseRichBolt {
public static Logger LOG = LoggerFactory.getLogger(TestEventOrderCheckBolt.class);
-
+
private int _count;
OutputCollector _collector;
Map<Integer, Long> recentEventId = new HashMap<Integer, Long>();
@@ -52,8 +52,9 @@ public class TestEventOrderCheckBolt extends BaseRichBolt {
Long recentEvent = recentEventId.get(sourceId);
if (null != recentEvent && eventId <= recentEvent) {
- String error = "Error: event id is not in strict order! event source Id: "
- + sourceId + ", last event Id: " + recentEvent + ", current event Id: " + eventId;
+ String error =
+ "Error: event id is not in strict order! event source Id: " + sourceId + ", last event Id: " + recentEvent + ", current event Id: "
+ + eventId;
_collector.emit(input, new Values(error));
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java b/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
index 5ef464a..45f48e4 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestGlobalCount.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class TestGlobalCount extends BaseRichBolt {
public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java b/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
index d41c36a..099a8db 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestJob.java
@@ -20,10 +20,7 @@ package backtype.storm.testing;
import backtype.storm.ILocalCluster;
/**
- * This is the core interface for the storm java testing, usually
- * we put our java unit testing logic in the run method. A sample
- * code will be:
- * <code>
+ * This is the core interface for the storm java testing, usually we put our java unit testing logic in the run method. A sample code will be: <code>
* Testing.withSimulatedTimeLocalCluster(new TestJob() {
* public void run(Cluster cluster) {
* // your testing logic here.
@@ -31,11 +28,10 @@ import backtype.storm.ILocalCluster;
* });
*/
public interface TestJob {
- /**
- * run the testing logic with the cluster.
- *
- * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code>
- * and <code>Testing.withTrackedCluster</code>.
- */
+ /**
+ * run the testing logic with the cluster.
+ *
+ * @param cluster the cluster which created by <code>Testing.withSimulatedTimeLocalCluster</code> and <code>Testing.withTrackedCluster</code>.
+ */
public void run(ILocalCluster cluster) throws Exception;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
index 0d30b26..769f1cf 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerBolt.java
@@ -25,16 +25,15 @@ import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.base.BaseRichBolt;
-
public class TestPlannerBolt extends BaseRichBolt {
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
}
-
+
public void execute(Tuple input) {
}
-
+
public Fields getOutputFields() {
return new Fields("field1", "field2");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
index f4c27c0..bcacd4d 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestPlannerSpout.java
@@ -27,11 +27,10 @@ import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
import java.util.HashMap;
-
public class TestPlannerSpout extends BaseRichSpout {
boolean _isDistributed;
Fields _outFields;
-
+
public TestPlannerSpout(Fields outFields, boolean isDistributed) {
_isDistributed = isDistributed;
_outFields = outFields;
@@ -40,34 +39,33 @@ public class TestPlannerSpout extends BaseRichSpout {
public TestPlannerSpout(boolean isDistributed) {
this(new Fields("field1", "field2"), isDistributed);
}
-
+
public TestPlannerSpout(Fields outFields) {
this(outFields, true);
}
-
+
public Fields getOutputFields() {
return _outFields;
}
-
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-
+
}
-
+
public void close() {
-
+
}
-
+
public void nextTuple() {
Utils.sleep(100);
}
-
- public void ack(Object msgId){
-
+
+ public void ack(Object msgId) {
+
}
- public void fail(Object msgId){
-
+ public void fail(Object msgId) {
+
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
@@ -77,9 +75,9 @@ public class TestPlannerSpout extends BaseRichSpout {
@Override
public Map<String, Object> getComponentConfiguration() {
Map<String, Object> ret = new HashMap<String, Object>();
- if(!_isDistributed) {
+ if (!_isDistributed) {
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
}
return ret;
- }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java b/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
index 1c7706f..5416ef0 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestSerObject.java
@@ -22,12 +22,12 @@ import java.io.Serializable;
public class TestSerObject implements Serializable {
public int f1;
public int f2;
-
+
public TestSerObject(int f1, int f2) {
this.f1 = f1;
this.f2 = f2;
}
-
+
@Override
public int hashCode() {
final int prime = 31;
@@ -36,7 +36,7 @@ public class TestSerObject implements Serializable {
result = prime * result + f2;
return result;
}
-
+
@Override
public boolean equals(Object obj) {
if (this == obj)
@@ -52,5 +52,5 @@ public class TestSerObject implements Serializable {
return false;
return true;
}
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java b/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
index 551b054..c6b32b5 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestWordCounter.java
@@ -29,29 +29,28 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static backtype.storm.utils.Utils.tuple;
-
public class TestWordCounter extends BaseBasicBolt {
public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
Map<String, Integer> _counts;
-
+
public void prepare(Map stormConf, TopologyContext context) {
_counts = new HashMap<String, Integer>();
}
-
+
public void execute(Tuple input, BasicOutputCollector collector) {
String word = (String) input.getValues().get(0);
int count = 0;
- if(_counts.containsKey(word)) {
+ if (_counts.containsKey(word)) {
count = _counts.get(word);
}
count++;
_counts.put(word, count);
collector.emit(tuple(word, count));
}
-
+
public void cleanup() {
-
+
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java b/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
index 745bf71..d5603a1 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TestWordSpout.java
@@ -31,7 +31,6 @@ import java.util.Random;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-
public class TestWordSpout extends BaseRichSpout {
public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
boolean _isDistributed;
@@ -44,43 +43,43 @@ public class TestWordSpout extends BaseRichSpout {
public TestWordSpout(boolean isDistributed) {
_isDistributed = isDistributed;
}
-
+
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
}
-
+
public void close() {
-
+
}
-
+
public void nextTuple() {
Utils.sleep(100);
- final String[] words = new String[] {"nathan", "mike", "jackson", "golda", "bertels"};
+ final String[] words = new String[] { "nathan", "mike", "jackson", "golda", "bertels" };
final Random rand = new Random();
final String word = words[rand.nextInt(words.length)];
_collector.emit(new Values(word));
}
-
+
public void ack(Object msgId) {
}
public void fail(Object msgId) {
-
+
}
-
+
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
- if(!_isDistributed) {
+ if (!_isDistributed) {
Map<String, Object> ret = new HashMap<String, Object>();
ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
return ret;
} else {
return null;
}
- }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java b/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
index f2691b7..60506b5 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TrackedTopology.java
@@ -23,12 +23,12 @@ import java.util.Map;
import backtype.storm.generated.StormTopology;
import clojure.lang.Keyword;
-public class TrackedTopology extends HashMap{
- public TrackedTopology(Map map) {
- super(map);
- }
-
- public StormTopology getTopology() {
- return (StormTopology)get(Keyword.intern("topology"));
- }
+public class TrackedTopology extends HashMap {
+ public TrackedTopology(Map map) {
+ super(map);
+ }
+
+ public StormTopology getTopology() {
+ return (StormTopology) get(Keyword.intern("topology"));
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java b/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
index e163576..9635887 100755
--- a/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
@@ -28,7 +28,6 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
-
public class TupleCaptureBolt implements IRichBolt {
public static transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<String, Map<String, List<FixedTuple>>>();
@@ -47,8 +46,8 @@ public class TupleCaptureBolt implements IRichBolt {
public void execute(Tuple input) {
String component = input.getSourceComponent();
Map<String, List<FixedTuple>> captured = emitted_tuples.get(_name);
- if(!captured.containsKey(component)) {
- captured.put(component, new ArrayList<FixedTuple>());
+ if (!captured.containsKey(component)) {
+ captured.put(component, new ArrayList<FixedTuple>());
}
captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues()));
_collector.ack(input);
@@ -60,7 +59,7 @@ public class TupleCaptureBolt implements IRichBolt {
public void cleanup() {
}
-
+
public Map<String, List<FixedTuple>> getAndRemoveResults() {
return emitted_tuples.remove(_name);
}
@@ -70,7 +69,7 @@ public class TupleCaptureBolt implements IRichBolt {
emitted_tuples.get(_name).clear();
return ret;
}
-
+
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
index 0c67324..a6614fc 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
@@ -36,19 +36,22 @@ public abstract class BaseConfigurationDeclarer<T extends ComponentConfiguration
@Override
public T setMaxTaskParallelism(Number val) {
- if(val!=null) val = val.intValue();
+ if (val != null)
+ val = val.intValue();
return addConfiguration(Config.TOPOLOGY_MAX_TASK_PARALLELISM, val);
}
@Override
public T setMaxSpoutPending(Number val) {
- if(val!=null) val = val.intValue();
+ if (val != null)
+ val = val.intValue();
return addConfiguration(Config.TOPOLOGY_MAX_SPOUT_PENDING, val);
}
-
+
@Override
public T setNumTasks(Number val) {
- if(val!=null) val = val.intValue();
+ if (val != null)
+ val = val.intValue();
return addConfiguration(Config.TOPOLOGY_TASKS, val);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
index 6c9cdc1..ea437c5 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
@@ -25,11 +25,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class BasicBoltExecutor implements IRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
-
+ public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
+
private IBasicBolt _bolt;
private transient BasicOutputCollector _collector;
-
+
public BasicBoltExecutor(IBasicBolt bolt) {
_bolt = bolt;
}
@@ -38,7 +38,6 @@ public class BasicBoltExecutor implements IRichBolt {
_bolt.declareOutputFields(declarer);
}
-
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
_bolt.prepare(stormConf, context);
_collector = new BasicOutputCollector(collector);
@@ -49,8 +48,8 @@ public class BasicBoltExecutor implements IRichBolt {
try {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
- } catch(FailedException e) {
- if(e instanceof ReportedFailedException) {
+ } catch (FailedException e) {
+ if (e instanceof ReportedFailedException) {
_collector.reportError(e);
}
_collector.getOutputter().fail(input);
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
index be1c242..e48f159 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BasicOutputCollector.java
@@ -23,7 +23,6 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.utils.Utils;
import java.util.List;
-
public class BasicOutputCollector implements IBasicOutputCollector {
private OutputCollector out;
private Tuple inputTuple;
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
index 0c4b200..8fe05e2 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/BoltDeclarer.java
@@ -19,8 +19,9 @@ package backtype.storm.topology;
/**
* BoltDeclarer includes grouping APIs for storm topology.
+ *
* @see <a href="https://storm.apache.org/documentation/Concepts.html">Concepts -Stream groupings-</a>
*/
public interface BoltDeclarer extends InputDeclarer<BoltDeclarer>, ComponentConfigurationDeclarer<BoltDeclarer> {
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
index d05dda0..49d78e5 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
@@ -21,9 +21,14 @@ import java.util.Map;
public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> {
T addConfigurations(Map conf);
+
T addConfiguration(String config, Object value);
+
T setDebug(boolean debug);
+
T setMaxTaskParallelism(Number val);
+
T setMaxSpoutPending(Number val);
+
T setNumTasks(Number val);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java b/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
index e174b5a..6c26bbf 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/FailedException.java
@@ -21,11 +21,11 @@ public class FailedException extends RuntimeException {
public FailedException() {
super();
}
-
+
public FailedException(String msg) {
super(msg);
}
-
+
public FailedException(String msg, Throwable cause) {
super(msg, cause);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
index 3b24f4e..81741df 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IBasicBolt.java
@@ -23,11 +23,13 @@ import java.util.Map;
public interface IBasicBolt extends IComponent {
void prepare(Map stormConf, TopologyContext context);
+
/**
* Process the input tuple and optionally emit new tuples based on the input tuple.
*
* All acking is managed for you. Throw a FailedException if you want to fail the tuple.
*/
void execute(Tuple input, BasicOutputCollector collector);
+
void cleanup();
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
index 92d60d2..85008c2 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
@@ -21,6 +21,8 @@ import java.util.List;
public interface IBasicOutputCollector {
List<Integer> emit(String streamId, List<Object> tuple);
+
void emitDirect(int taskId, String streamId, List<Object> tuple);
+
void reportError(Throwable t);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
index 560c96f..1d0865d 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IComponent.java
@@ -21,23 +21,21 @@ import java.io.Serializable;
import java.util.Map;
/**
- * Common methods for all possible components in a topology. This interface is used
- * when defining topologies using the Java API.
+ * Common methods for all possible components in a topology. This interface is used when defining topologies using the Java API.
*/
public interface IComponent extends Serializable {
/**
* Declare the output schema for all the streams of this topology.
- *
+ *
* @param declarer this is used to declare output stream ids, output fields, and whether or not each output stream is a direct stream
*/
void declareOutputFields(OutputFieldsDeclarer declarer);
/**
- * Declare configuration specific to this component. Only a subset of the "topology.*" configs can
- * be overridden. The component configuration can be further overridden when constructing the
- * topology using {@link TopologyBuilder}
- *
+ * Declare configuration specific to this component. Only a subset of the "topology.*" configs can be overridden. The component configuration can be further
+ * overridden when constructing the topology using {@link TopologyBuilder}
+ *
*/
Map<String, Object> getComponentConfiguration();
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java b/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java
deleted file mode 100644
index 3ce9da7..0000000
--- a/jstorm-core/src/main/java/backtype/storm/topology/IConfig.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.topology;
-
-import java.util.Map;
-
-/*
- * This interface is used to notify the update of user configuration
- * for bolt and spout
- */
-public interface IConfig {
- public void updateConf(Map conf);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java
new file mode 100644
index 0000000..573ca99
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IDynamicComponent.java
@@ -0,0 +1,13 @@
+package backtype.storm.topology;
+
+import java.io.Serializable;
+import java.util.Map;
+
+/*
+ * This interface is used to notify the update of user configuration
+ * for bolt and spout
+ */
+
+public interface IDynamicComponent extends Serializable {
+ public void update(Map conf);
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
index d35244e..d44619c 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichBolt.java
@@ -20,9 +20,8 @@ package backtype.storm.topology;
import backtype.storm.task.IBolt;
/**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
- * to use to implement components of the topology.
- *
+ * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces to use to implement components of the topology.
+ *
*/
public interface IRichBolt extends IBolt, IComponent {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
index b088641..e1bdc02 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichSpout.java
@@ -20,9 +20,8 @@ package backtype.storm.topology;
import backtype.storm.spout.ISpout;
/**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces
- * to use to implement components of the topology.
- *
+ * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout} are the main interfaces to use to implement components of the topology.
+ *
*/
public interface IRichSpout extends ISpout, IComponent {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
index edcc0ff..a22acd4 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/IRichStateSpout.java
@@ -19,7 +19,6 @@ package backtype.storm.topology;
import backtype.storm.state.IStateSpout;
-
public interface IRichStateSpout extends IStateSpout, IComponent {
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
index 33540de..54f2702 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/InputDeclarer.java
@@ -22,10 +22,10 @@ import backtype.storm.generated.Grouping;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.tuple.Fields;
-
public interface InputDeclarer<T extends InputDeclarer> {
/**
* The stream is partitioned by the fields specified in the grouping.
+ *
* @param componentId
* @param fields
* @return
@@ -34,6 +34,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
/**
* The stream is partitioned by the fields specified in the grouping.
+ *
* @param componentId
* @param streamId
* @param fields
@@ -42,16 +43,16 @@ public interface InputDeclarer<T extends InputDeclarer> {
public T fieldsGrouping(String componentId, String streamId, Fields fields);
/**
- * The entire stream goes to a single one of the bolt's tasks.
- * Specifically, it goes to the task with the lowest id.
+ * The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
+ *
* @param componentId
* @return
*/
public T globalGrouping(String componentId);
/**
- * The entire stream goes to a single one of the bolt's tasks.
- * Specifically, it goes to the task with the lowest id.
+ * The entire stream goes to a single one of the bolt's tasks. Specifically, it goes to the task with the lowest id.
+ *
* @param componentId
* @param streamId
* @return
@@ -59,16 +60,16 @@ public interface InputDeclarer<T extends InputDeclarer> {
public T globalGrouping(String componentId, String streamId);
/**
- * Tuples are randomly distributed across the bolt's tasks in a way such that
- * each bolt is guaranteed to get an equal number of tuples.
+ * Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
+ *
* @param componentId
* @return
*/
public T shuffleGrouping(String componentId);
/**
- * Tuples are randomly distributed across the bolt's tasks in a way such that
- * each bolt is guaranteed to get an equal number of tuples.
+ * Tuples are randomly distributed across the bolt's tasks in a way such that each bolt is guaranteed to get an equal number of tuples.
+ *
* @param componentId
* @param streamId
* @return
@@ -76,29 +77,31 @@ public interface InputDeclarer<T extends InputDeclarer> {
public T shuffleGrouping(String componentId, String streamId);
/**
- * If the target bolt has one or more tasks in the same worker process,
- * tuples will be shuffled to just those in-process tasks.
- * Otherwise, this acts like a normal shuffle grouping.
+ * If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a
+ * normal shuffle grouping.
+ *
* @param componentId
* @return
*/
public T localOrShuffleGrouping(String componentId);
/**
- * If the target bolt has one or more tasks in the same worker process,
- * tuples will be shuffled to just those in-process tasks.
- * Otherwise, this acts like a normal shuffle grouping.
+ * If the target bolt has one or more tasks in the same worker process, tuples will be shuffled to just those in-process tasks. Otherwise, this acts like a
+ * normal shuffle grouping.
+ *
* @param componentId
* @param streamId
* @return
*/
public T localOrShuffleGrouping(String componentId, String streamId);
-
+
public T localFirstGrouping(String componentId);
-
+
public T localFirstGrouping(String componentId, String streamId);
+
/**
* This grouping specifies that you don't care how the stream is grouped.
+ *
* @param componentId
* @return
*/
@@ -106,6 +109,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
/**
* This grouping specifies that you don't care how the stream is grouped.
+ *
* @param componentId
* @param streamId
* @return
@@ -114,6 +118,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
/**
* The stream is replicated across all the bolt's tasks. Use this grouping with care.
+ *
* @param componentId
* @return
*/
@@ -121,6 +126,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
/**
* The stream is replicated across all the bolt's tasks. Use this grouping with care.
+ *
* @param componentId
* @param streamId
* @return
@@ -128,16 +134,16 @@ public interface InputDeclarer<T extends InputDeclarer> {
public T allGrouping(String componentId, String streamId);
/**
- * A stream grouped this way means that the producer of the tuple decides
- * which task of the consumer will receive this tuple.
+ * A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple.
+ *
* @param componentId
* @return
*/
public T directGrouping(String componentId);
/**
- * A stream grouped this way means that the producer of the tuple decides
- * which task of the consumer will receive this tuple.
+ * A stream grouped this way means that the producer of the tuple decides which task of the consumer will receive this tuple.
+ *
* @param componentId
* @param streamId
* @return
@@ -145,9 +151,9 @@ public interface InputDeclarer<T extends InputDeclarer> {
public T directGrouping(String componentId, String streamId);
/**
- * Tuples are passed to two hashing functions and each target task is
- * decided based on the comparison of the state of candidate nodes.
- * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
+ * Tuples are passed to two hashing functions and each target task is decided based on the comparison of the state of candidate nodes.
+ *
+ * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
* @param componentId
* @param fields
* @return
@@ -155,9 +161,9 @@ public interface InputDeclarer<T extends InputDeclarer> {
public T partialKeyGrouping(String componentId, Fields fields);
/**
- * Tuples are passed to two hashing functions and each target task is
- * decided based on the comparison of the state of candidate nodes.
- * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
+ * Tuples are passed to two hashing functions and each target task is decided based on the comparison of the state of candidate nodes.
+ *
+ * @see https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf
* @param componentId
* @param streamId
* @param fields
@@ -167,6 +173,7 @@ public interface InputDeclarer<T extends InputDeclarer> {
/**
* A custom stream grouping by implementing the CustomStreamGrouping interface.
+ *
* @param componentId
* @param grouping
* @return
@@ -175,13 +182,14 @@ public interface InputDeclarer<T extends InputDeclarer> {
/**
* A custom stream grouping by implementing the CustomStreamGrouping interface.
+ *
* @param componentId
* @param streamId
* @param grouping
* @return
*/
public T customGrouping(String componentId, String streamId, CustomStreamGrouping grouping);
-
+
public T grouping(GlobalStreamId id, Grouping grouping);
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
index 2ac4794..d5ca7ca 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
@@ -19,14 +19,15 @@ package backtype.storm.topology;
import backtype.storm.tuple.Fields;
-
public interface OutputFieldsDeclarer {
/**
* Uses default stream id.
*/
public void declare(Fields fields);
+
public void declare(boolean direct, Fields fields);
-
+
public void declareStream(String streamId, Fields fields);
+
public void declareStream(String streamId, boolean direct, Fields fields);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
index 0e7fd59..1fdcf86 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
@@ -39,13 +39,12 @@ public class OutputFieldsGetter implements OutputFieldsDeclarer {
}
public void declareStream(String streamId, boolean direct, Fields fields) {
- if(_fields.containsKey(streamId)) {
+ if (_fields.containsKey(streamId)) {
throw new IllegalArgumentException("Fields for " + streamId + " already set");
}
_fields.put(streamId, new StreamInfo(fields.toList(), direct));
}
-
public Map<String, StreamInfo> getFieldsDeclaration() {
return _fields;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java b/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
index 4e4ebe4..c90a545 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/ReportedFailedException.java
@@ -21,11 +21,11 @@ public class ReportedFailedException extends FailedException {
public ReportedFailedException() {
super();
}
-
+
public ReportedFailedException(String msg) {
super(msg);
}
-
+
public ReportedFailedException(String msg, Throwable cause) {
super(msg, cause);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java b/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
index c0d8254..9c5ec34 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/SpoutDeclarer.java
@@ -18,5 +18,5 @@
package backtype.storm.topology;
public interface SpoutDeclarer extends ComponentConfigurationDeclarer<SpoutDeclarer> {
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
index c04e449..2b546e3 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/TopologyBuilder.java
@@ -18,108 +18,90 @@
package backtype.storm.topology;
import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.NullStruct;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
import backtype.storm.grouping.CustomStreamGrouping;
import backtype.storm.grouping.PartialKeyGrouping;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
+import org.json.simple.JSONValue;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
-import org.json.simple.JSONValue;
/**
- * TopologyBuilder exposes the Java API for specifying a topology for Storm
- * to execute. Topologies are Thrift structures in the end, but since the Thrift API
- * is so verbose, TopologyBuilder greatly eases the process of creating topologies.
- * The template for creating and submitting a topology looks something like:
- *
+ * TopologyBuilder exposes the Java API for specifying a topology for Storm to execute. Topologies are Thrift structures in the end, but since the Thrift API is
+ * so verbose, TopologyBuilder greatly eases the process of creating topologies. The template for creating and submitting a topology looks something like:
+ *
* <pre>
* TopologyBuilder builder = new TopologyBuilder();
- *
- * builder.setSpout("1", new TestWordSpout(true), 5);
- * builder.setSpout("2", new TestWordSpout(true), 3);
- * builder.setBolt("3", new TestWordCounter(), 3)
- * .fieldsGrouping("1", new Fields("word"))
- * .fieldsGrouping("2", new Fields("word"));
- * builder.setBolt("4", new TestGlobalCount())
- * .globalGrouping("1");
- *
+ *
+ * builder.setSpout("1", new TestWordSpout(true), 5);
+ * builder.setSpout("2", new TestWordSpout(true), 3);
+ * builder.setBolt("3", new TestWordCounter(), 3).fieldsGrouping("1", new Fields("word")).fieldsGrouping("2", new Fields("word"));
+ * builder.setBolt("4", new TestGlobalCount()).globalGrouping("1");
+ *
* Map conf = new HashMap();
* conf.put(Config.TOPOLOGY_WORKERS, 4);
*
- * StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
+ * StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
* </pre>
- *
- * Running the exact same topology in local mode (in process), and configuring it to log all tuples
- * emitted, looks like the following. Note that it lets the topology run for 10 seconds
- * before shutting down the local cluster.
- *
+ *
+ * Running the exact same topology in local mode (in process), and configuring it to log all tuples emitted, looks like the following. Note that it lets the
+ * topology run for 10 seconds before shutting down the local cluster.
+ *
* <pre>
* TopologyBuilder builder = new TopologyBuilder();
- *
- * builder.setSpout("1", new TestWordSpout(true), 5);
- * builder.setSpout("2", new TestWordSpout(true), 3);
- * builder.setBolt("3", new TestWordCounter(), 3)
- * .fieldsGrouping("1", new Fields("word"))
- * .fieldsGrouping("2", new Fields("word"));
- * builder.setBolt("4", new TestGlobalCount())
- * .globalGrouping("1");
- *
+ *
+ * builder.setSpout("1", new TestWordSpout(true), 5);
+ * builder.setSpout("2", new TestWordSpout(true), 3);
+ * builder.setBolt("3", new TestWordCounter(), 3).fieldsGrouping("1", new Fields("word")).fieldsGrouping("2", new Fields("word"));
+ * builder.setBolt("4", new TestGlobalCount()).globalGrouping("1");
+ *
* Map conf = new HashMap();
* conf.put(Config.TOPOLOGY_WORKERS, 4);
* conf.put(Config.TOPOLOGY_DEBUG, true);
- *
+ *
* LocalCluster cluster = new LocalCluster();
- * cluster.submitTopology("mytopology", conf, builder.createTopology());
+ * cluster.submitTopology("mytopology", conf, builder.createTopology());
* Utils.sleep(10000);
* cluster.shutdown();
* </pre>
- *
- * <p>The pattern for TopologyBuilder is to map component ids to components using the setSpout
- * and setBolt methods. Those methods return objects that are then used to declare
- * the inputs for that component.</p>
+ *
+ * <p>
+ * The pattern for TopologyBuilder is to map component ids to components using the setSpout and setBolt methods. Those methods return objects that are then used
+ * to declare the inputs for that component.
+ * </p>
*/
public class TopologyBuilder {
private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
-// private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
+ // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new HashMap<String, Map<GlobalStreamId, Grouping>>();
private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
-
-
+
public StormTopology createTopology() {
Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
- for(String boltId: _bolts.keySet()) {
+ for (String boltId : _bolts.keySet()) {
IRichBolt bolt = _bolts.get(boltId);
ComponentCommon common = getComponentCommon(boltId, bolt);
boltSpecs.put(boltId, new Bolt(ComponentObject.serialized_java(Utils.javaSerialize(bolt)), common));
}
- for(String spoutId: _spouts.keySet()) {
+ for (String spoutId : _spouts.keySet()) {
IRichSpout spout = _spouts.get(spoutId);
ComponentCommon common = getComponentCommon(spoutId, spout);
spoutSpecs.put(spoutId, new SpoutSpec(ComponentObject.serialized_java(Utils.javaSerialize(spout)), common));
-
+
}
- return new StormTopology(spoutSpecs,
- boltSpecs,
- new HashMap<String, StateSpoutSpec>());
+ return new StormTopology(spoutSpecs, boltSpecs, new HashMap<String, StateSpoutSpec>());
}
/**
* Define a new bolt in this topology with parallelism of just one thread.
- *
+ *
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the bolt
* @return use the returned object to declare the inputs to this component
@@ -130,10 +112,11 @@ public class TopologyBuilder {
/**
* Define a new bolt in this topology with the specified amount of parallelism.
- *
+ *
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the bolt
- * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around the cluster.
+ * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somewhere around
+ * the cluster.
* @return use the returned object to declare the inputs to this component
*/
public BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism_hint) {
@@ -144,11 +127,9 @@ public class TopologyBuilder {
}
/**
- * Define a new bolt in this topology. This defines a basic bolt, which is a
- * simpler to use but more restricted kind of bolt. Basic bolts are intended
- * for non-aggregation processing and automate the anchoring/acking process to
- * achieve proper reliability in the topology.
- *
+ * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for
+ * non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.
+ *
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the basic bolt
* @return use the returned object to declare the inputs to this component
@@ -158,14 +139,13 @@ public class TopologyBuilder {
}
/**
- * Define a new bolt in this topology. This defines a basic bolt, which is a
- * simpler to use but more restricted kind of bolt. Basic bolts are intended
- * for non-aggregation processing and automate the anchoring/acking process to
- * achieve proper reliability in the topology.
- *
+ * Define a new bolt in this topology. This defines a basic bolt, which is a simpler to use but more restricted kind of bolt. Basic bolts are intended for
+ * non-aggregation processing and automate the anchoring/acking process to achieve proper reliability in the topology.
+ *
* @param id the id of this component. This id is referenced by other components that want to consume this bolt's outputs.
* @param bolt the basic bolt
- * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around the cluster.
+ * @param parallelism_hint the number of tasks that should be assigned to execute this bolt. Each task will run on a thread in a process somwehere around
+ * the cluster.
* @return use the returned object to declare the inputs to this component
*/
public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism_hint) {
@@ -174,7 +154,7 @@ public class TopologyBuilder {
/**
* Define a new spout in this topology.
- *
+ *
* @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
* @param spout the spout
*/
@@ -183,12 +163,12 @@ public class TopologyBuilder {
}
/**
- * Define a new spout in this topology with the specified parallelism. If the spout declares
- * itself as non-distributed, the parallelism_hint will be ignored and only one task
- * will be allocated to this component.
- *
+ * Define a new spout in this topology with the specified parallelism. If the spout declares itself as non-distributed, the parallelism_hint will be ignored
+ * and only one task will be allocated to this component.
+ *
* @param id the id of this component. This id is referenced by other components that want to consume this spout's outputs.
- * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around the cluster.
+ * @param parallelism_hint the number of tasks that should be assigned to execute this spout. Each task will run on a thread in a process somwehere around
+ * the cluster.
* @param spout the spout
*/
public SpoutDeclarer setSpout(String id, IRichSpout spout, Number parallelism_hint) {
@@ -207,51 +187,51 @@ public class TopologyBuilder {
// TODO: finish
}
-
private void validateUnusedId(String id) {
- if(_bolts.containsKey(id)) {
+ if (_bolts.containsKey(id)) {
throw new IllegalArgumentException("Bolt has already been declared for id " + id);
}
- if(_spouts.containsKey(id)) {
+ if (_spouts.containsKey(id)) {
throw new IllegalArgumentException("Spout has already been declared for id " + id);
}
- if(_stateSpouts.containsKey(id)) {
+ if (_stateSpouts.containsKey(id)) {
throw new IllegalArgumentException("State spout has already been declared for id " + id);
}
}
private ComponentCommon getComponentCommon(String id, IComponent component) {
ComponentCommon ret = new ComponentCommon(_commons.get(id));
-
+
OutputFieldsGetter getter = new OutputFieldsGetter();
component.declareOutputFields(getter);
ret.set_streams(getter.getFieldsDeclaration());
- return ret;
+ return ret;
}
-
+
private void initCommon(String id, IComponent component, Number parallelism) {
ComponentCommon common = new ComponentCommon();
common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
- if(parallelism!=null) {
+ if (parallelism != null) {
common.set_parallelism_hint(parallelism.intValue());
- }else {
+ } else {
common.set_parallelism_hint(1);
}
Map conf = component.getComponentConfiguration();
- if(conf!=null) common.set_json_conf(JSONValue.toJSONString(conf));
+ if (conf != null)
+ common.set_json_conf(JSONValue.toJSONString(conf));
_commons.put(id, common);
}
protected class ConfigGetter<T extends ComponentConfigurationDeclarer> extends BaseConfigurationDeclarer<T> {
String _id;
-
+
public ConfigGetter(String id) {
_id = id;
}
-
+
@Override
public T addConfigurations(Map conf) {
- if(conf!=null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
+ if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
throw new IllegalArgumentException("Cannot set serializations for a component using fluent API");
}
String currConf = _commons.get(_id).get_json_conf();
@@ -259,13 +239,13 @@ public class TopologyBuilder {
return (T) this;
}
}
-
+
protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements SpoutDeclarer {
public SpoutGetter(String id) {
super(id);
- }
+ }
}
-
+
protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements BoltDeclarer {
private String _boltId;
@@ -305,17 +285,17 @@ public class TopologyBuilder {
public BoltDeclarer localOrShuffleGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.local_or_shuffle(new NullStruct()));
}
-
+
@Override
public BoltDeclarer localFirstGrouping(String componentId) {
return localFirstGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
-
+
@Override
public BoltDeclarer localFirstGrouping(String componentId, String streamId) {
return grouping(componentId, streamId, Grouping.localFirst(new NullStruct()));
}
-
+
public BoltDeclarer noneGrouping(String componentId) {
return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
}
@@ -368,17 +348,20 @@ public class TopologyBuilder {
@Override
public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) {
return grouping(id.get_componentId(), id.get_streamId(), grouping);
- }
+ }
}
-
+
private static Map parseJson(String json) {
- if(json==null) return new HashMap();
- else return (Map) JSONValue.parse(json);
+ if (json == null)
+ return new HashMap();
+ else
+ return (Map) JSONValue.parse(json);
}
-
+
private static String mergeIntoJson(Map into, Map newMap) {
Map res = new HashMap(into);
- if(newMap!=null) res.putAll(newMap);
+ if (newMap != null)
+ res.putAll(newMap);
return JSONValue.toJSONString(res);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
index e585ee6..eb13e56 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
@@ -29,5 +29,5 @@ public abstract class BaseBasicBolt extends BaseComponent implements IBasicBolt
@Override
public void cleanup() {
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
index 3206941..43d21a3 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
@@ -21,5 +21,5 @@ import backtype.storm.coordination.IBatchBolt;
import java.util.Map;
public abstract class BaseBatchBolt<T> extends BaseComponent implements IBatchBolt<T> {
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
index 8afcdaa..1206abc 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseComponent.java
@@ -24,5 +24,5 @@ public abstract class BaseComponent implements IComponent {
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
index 2d20a48..64c3887 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
@@ -19,7 +19,6 @@ package backtype.storm.topology.base;
import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-
public abstract class BaseOpaquePartitionedTransactionalSpout<T> extends BaseComponent implements IOpaquePartitionedTransactionalSpout<T> {
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
index 266736e..ebf31eb 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
@@ -22,5 +22,5 @@ import backtype.storm.topology.IRichBolt;
public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
@Override
public void cleanup() {
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
index 37513b7..18f1f2c 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
@@ -24,7 +24,7 @@ package backtype.storm.topology.base;
import backtype.storm.topology.IRichSpout;
/**
- *
+ *
* @author nathan
*/
public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
index b6451e9..246b3a3 100755
--- a/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
+++ b/jstorm-core/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
@@ -20,5 +20,5 @@ package backtype.storm.topology.base;
import backtype.storm.transactional.TransactionAttempt;
public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
index 859bad2..0e91178 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitter.java
@@ -18,9 +18,8 @@
package backtype.storm.transactional;
/**
- * This marks an IBatchBolt within a transactional topology as a committer. This causes the
- * finishBatch method to be called in order of the transactions.
+ * This marks an IBatchBolt within a transactional topology as a committer. This causes the finishBatch method to be called in order of the transactions.
*/
public interface ICommitter {
-
+
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
index 5441ee2..1cd448c 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
@@ -20,12 +20,11 @@ package backtype.storm.transactional;
import backtype.storm.task.TopologyContext;
import java.util.Map;
-
public interface ICommitterTransactionalSpout<X> extends ITransactionalSpout<X> {
public interface Emitter extends ITransactionalSpout.Emitter {
void commit(TransactionAttempt attempt);
- }
-
+ }
+
@Override
- public Emitter getEmitter(Map conf, TopologyContext context);
+ public Emitter getEmitter(Map conf, TopologyContext context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java b/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
index 3207493..528eda7 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
@@ -26,69 +26,62 @@ import java.util.Map;
public interface ITransactionalSpout<T> extends IComponent {
public interface Coordinator<X> {
/**
- * Create metadata for this particular transaction id which has never
- * been emitted before. The metadata should contain whatever is necessary
- * to be able to replay the exact batch for the transaction at a later point.
+ * Create metadata for this particular transaction id which has never been emitted before. The metadata should contain whatever is necessary to be able
+ * to replay the exact batch for the transaction at a later point.
*
* The metadata is stored in Zookeeper.
*
- * Storm uses the Kryo serializations configured in the component configuration
- * for this spout to serialize and deserialize the metadata.
+ * Storm uses the Kryo serializations configured in the component configuration for this spout to serialize and deserialize the metadata.
*
* @param txid The id of the transaction.
* @param prevMetadata The metadata of the previous transaction
* @return the metadata for this new transaction
*/
X initializeTransaction(BigInteger txid, X prevMetadata);
-
+
/**
* Returns true if its ok to emit start a new transaction, false otherwise (will skip this transaction).
*
- * You should sleep here if you want a delay between asking for the next transaction (this will be called
- * repeatedly in a loop).
+ * You should sleep here if you want a delay between asking for the next transaction (this will be called repeatedly in a loop).
*/
boolean isReady();
-
+
/**
* Release any resources from this coordinator.
*/
void close();
}
-
+
public interface Emitter<X> {
/**
- * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata
- * was created by the Coordinator in the initializeTranaction method. This method must always emit
- * the same batch of tuples across all tasks for the same transaction id.
+ * Emit a batch for the specified transaction attempt and metadata for the transaction. The metadata was created by the Coordinator in the
+ * initializeTranaction method. This method must always emit the same batch of tuples across all tasks for the same transaction id.
*
* The first field of all emitted tuples must contain the provided TransactionAttempt.
*
*/
void emitBatch(TransactionAttempt tx, X coordinatorMeta, BatchOutputCollector collector);
-
+
/**
- * Any state for transactions prior to the provided transaction id can be safely cleaned up, so this
- * method should clean up that state.
+ * Any state for transactions prior to the provided transaction id can be safely cleaned up, so this method should clean up that state.
*/
void cleanupBefore(BigInteger txid);
-
+
/**
* Release any resources held by this emitter.
*/
void close();
}
-
+
/**
- * The coordinator for a TransactionalSpout runs in a single thread and indicates when batches
- * of tuples should be emitted and when transactions should commit. The Coordinator that you provide
- * in a TransactionalSpout provides metadata for each transaction so that the transactions can be replayed.
+ * The coordinator for a TransactionalSpout runs in a single thread and indicates when batches of tuples should be emitted and when transactions should
+ * commit. The Coordinator that you provide in a TransactionalSpout provides metadata for each transaction so that the transactions can be replayed.
*/
Coordinator<T> getCoordinator(Map conf, TopologyContext context);
/**
- * The emitter for a TransactionalSpout runs as many tasks across the cluster. Emitters are responsible for
- * emitting batches of tuples for a transaction and must ensure that the same batch of tuples is always
- * emitted for the same transaction id.
- */
+ * The emitter for a TransactionalSpout runs as many tasks across the cluster. Emitters are responsible for emitting batches of tuples for a transaction and
+ * must ensure that the same batch of tuples is always emitted for the same transaction id.
+ */
Emitter<T> getEmitter(Map conf, TopologyContext context);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
index 80bbb0e..e64a2d7 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionAttempt.java
@@ -22,22 +22,21 @@ import java.math.BigInteger;
public class TransactionAttempt {
BigInteger _txid;
long _attemptId;
-
-
+
// for kryo compatibility
public TransactionAttempt() {
-
+
}
-
+
public TransactionAttempt(BigInteger txid, long attemptId) {
_txid = txid;
_attemptId = attemptId;
}
-
+
public BigInteger getTransactionId() {
return _txid;
}
-
+
public long getAttemptId() {
return _attemptId;
}
@@ -49,7 +48,8 @@ public class TransactionAttempt {
@Override
public boolean equals(Object o) {
- if(!(o instanceof TransactionAttempt)) return false;
+ if (!(o instanceof TransactionAttempt))
+ return false;
TransactionAttempt other = (TransactionAttempt) o;
return _txid.equals(other._txid) && _attemptId == other._attemptId;
}
@@ -57,5 +57,5 @@ public class TransactionAttempt {
@Override
public String toString() {
return "" + _txid + ":" + _attemptId;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/7eaf0651/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
index 53aacae..9bcd75d 100755
--- a/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
+++ b/jstorm-core/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
@@ -31,18 +31,18 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TransactionalSpoutBatchExecutor implements IRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(TransactionalSpoutBatchExecutor.class);
+ public static Logger LOG = LoggerFactory.getLogger(TransactionalSpoutBatchExecutor.class);
BatchOutputCollectorImpl _collector;
ITransactionalSpout _spout;
ITransactionalSpout.Emitter _emitter;
-
+
TreeMap<BigInteger, TransactionAttempt> _activeTransactions = new TreeMap<BigInteger, TransactionAttempt>();
public TransactionalSpoutBatchExecutor(ITransactionalSpout spout) {
_spout = spout;
}
-
+
@Override
public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
_collector = new BatchOutputCollectorImpl(collector);
@@ -53,27 +53,27 @@ public class TransactionalSpoutBatchExecutor implements IRichBolt {
public void execute(Tuple input) {
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
try {
- if(input.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
- if(attempt.equals(_activeTransactions.get(attempt.getTransactionId()))) {
+ if (input.getSourceStreamId().equals(TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
+ if (attempt.equals(_activeTransactions.get(attempt.getTransactionId()))) {
((ICommitterTransactionalSpout.Emitter) _emitter).commit(attempt);
_activeTransactions.remove(attempt.getTransactionId());
_collector.ack(input);
} else {
_collector.fail(input);
}
- } else {
+ } else {
_emitter.emitBatch(attempt, input.getValue(1), _collector);
_activeTransactions.put(attempt.getTransactionId(), attempt);
_collector.ack(input);
BigInteger committed = (BigInteger) input.getValue(2);
- if(committed!=null) {
- // valid to delete before what's been committed since
+ if (committed != null) {
+ // valid to delete before what's been committed since
// those batches will never be accessed again
_activeTransactions.headMap(committed).clear();
_emitter.cleanupBefore(committed);
}
}
- } catch(FailedException e) {
+ } catch (FailedException e) {
LOG.warn("Failed to emit batch for transaction", e);
_collector.fail(input);
}