You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:04 UTC
[25/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java b/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java
deleted file mode 100644
index 35a2592..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.ILocalCluster;
-
-/**
- * This is the core interface for the storm java testing, usually we put our
- * java unit testing logic in the run method. A sample code will be: <code>
- * Testing.withSimulatedTimeLocalCluster(new TestJob() {
- * public void run(Cluster cluster) {
- * // your testing logic here.
- * }
- * });
- */
-public interface TestJob {
- /**
- * run the testing logic with the cluster.
- *
- * @param cluster
- * the cluster which created by
- * <code>Testing.withSimulatedTimeLocalCluster</code> and
- * <code>Testing.withTrackedCluster</code>.
- */
- public void run(ILocalCluster cluster) throws Exception;
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java b/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java
deleted file mode 100644
index fd9053c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.serialization.IKryoDecorator;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class TestKryoDecorator implements IKryoDecorator {
-
- public void decorate(Kryo k) {
- k.register(TestSerObject.class);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java
deleted file mode 100644
index 69e175e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
-
-public class TestPlannerBolt extends BaseRichBolt {
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
-
- }
-
- public void execute(Tuple input) {
-
- }
-
- public Fields getOutputFields() {
- return new Fields("field1", "field2");
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(getOutputFields());
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java
deleted file mode 100644
index c00dc45..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.Map;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-
-public class TestPlannerSpout extends BaseRichSpout {
- boolean _isDistributed;
- Fields _outFields;
-
- public TestPlannerSpout(Fields outFields, boolean isDistributed) {
- _isDistributed = isDistributed;
- _outFields = outFields;
- }
-
- public TestPlannerSpout(boolean isDistributed) {
- this(new Fields("field1", "field2"), isDistributed);
- }
-
- public TestPlannerSpout(Fields outFields) {
- this(outFields, true);
- }
-
- public Fields getOutputFields() {
- return _outFields;
- }
-
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
-
- }
-
- public void close() {
-
- }
-
- public void nextTuple() {
- Utils.sleep(100);
- }
-
- public void ack(Object msgId) {
-
- }
-
- public void fail(Object msgId) {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(getOutputFields());
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Map<String, Object> ret = new HashMap<String, Object>();
- if (!_isDistributed) {
- ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
- }
- return ret;
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java b/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java
deleted file mode 100644
index ab356a3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package backtype.storm.testing;
-
-import java.io.Serializable;
-
-public class TestSerObject implements Serializable {
- public int f1;
- public int f2;
-
- public TestSerObject(int f1, int f2) {
- this.f1 = f1;
- this.f2 = f2;
- }
-
- @Override
- public int hashCode() {
- final int prime = 31;
- int result = 1;
- result = prime * result + f1;
- result = prime * result + f2;
- return result;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj)
- return true;
- if (obj == null)
- return false;
- if (getClass() != obj.getClass())
- return false;
- TestSerObject other = (TestSerObject) obj;
- if (f1 != other.f1)
- return false;
- if (f2 != other.f2)
- return false;
- return true;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java b/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java
deleted file mode 100644
index 3572b9d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import java.util.HashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.tuple;
-
-public class TestWordCounter extends BaseBasicBolt {
- public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
-
- Map<String, Integer> _counts;
-
- public void prepare(Map stormConf, TopologyContext context) {
- _counts = new HashMap<String, Integer>();
- }
-
- public void execute(Tuple input, BasicOutputCollector collector) {
- String word = (String) input.getValues().get(0);
- int count = 0;
- if (_counts.containsKey(word)) {
- count = _counts.get(word);
- }
- count++;
- _counts.put(word, count);
- collector.emit(tuple(word, count));
- }
-
- public void cleanup() {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word", "count"));
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java
deleted file mode 100644
index 432e5de..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.Map;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestWordSpout extends BaseRichSpout {
- public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
- boolean _isDistributed;
- SpoutOutputCollector _collector;
-
- public TestWordSpout() {
- this(true);
- }
-
- public TestWordSpout(boolean isDistributed) {
- _isDistributed = isDistributed;
- }
-
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- _collector = collector;
- }
-
- public void close() {
-
- }
-
- public void nextTuple() {
- Utils.sleep(100);
- final String[] words = new String[] { "nathan", "mike", "jackson",
- "golda", "bertels" };
- final Random rand = new Random();
- final String word = words[rand.nextInt(words.length)];
- _collector.emit(new Values(word));
- }
-
- public void ack(Object msgId) {
-
- }
-
- public void fail(Object msgId) {
-
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("word"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- if (!_isDistributed) {
- Map<String, Object> ret = new HashMap<String, Object>();
- ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
- return ret;
- } else {
- return null;
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java b/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java
deleted file mode 100644
index dc98ba5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import backtype.storm.generated.StormTopology;
-import clojure.lang.Keyword;
-
-public class TrackedTopology extends HashMap {
- public TrackedTopology(Map map) {
- super(map);
- }
-
- public StormTopology getTopology() {
- return (StormTopology) get(Keyword.intern("topology"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
deleted file mode 100644
index 33c07b7..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class TupleCaptureBolt implements IRichBolt {
- public static transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<String, Map<String, List<FixedTuple>>>();
-
- private String _name;
- private transient OutputCollector _collector;
-
- public TupleCaptureBolt() {
- _name = UUID.randomUUID().toString();
- emitted_tuples.put(_name, new HashMap<String, List<FixedTuple>>());
- }
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- _collector = collector;
- }
-
- public void execute(Tuple input) {
- String component = input.getSourceComponent();
- Map<String, List<FixedTuple>> captured = emitted_tuples.get(_name);
- if (!captured.containsKey(component)) {
- captured.put(component, new ArrayList<FixedTuple>());
- }
- captured.get(component).add(
- new FixedTuple(input.getSourceStreamId(), input.getValues()));
- _collector.ack(input);
- }
-
- public Map<String, List<FixedTuple>> getResults() {
- return emitted_tuples.get(_name);
- }
-
- public void cleanup() {
- }
-
- public Map<String, List<FixedTuple>> getAndRemoveResults() {
- return emitted_tuples.remove(_name);
- }
-
- public Map<String, List<FixedTuple>> getAndClearResults() {
- Map<String, List<FixedTuple>> ret = new HashMap<String, List<FixedTuple>>(
- emitted_tuples.get(_name));
- emitted_tuples.get(_name).clear();
- return ret;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
deleted file mode 100644
index fcf90b5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.Config;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer>
- implements ComponentConfigurationDeclarer<T> {
- @Override
- public T addConfiguration(String config, Object value) {
- Map configMap = new HashMap();
- configMap.put(config, value);
- return addConfigurations(configMap);
- }
-
- @Override
- public T setDebug(boolean debug) {
- return addConfiguration(Config.TOPOLOGY_DEBUG, debug);
- }
-
- @Override
- public T setMaxTaskParallelism(Number val) {
- if (val != null)
- val = val.intValue();
- return addConfiguration(Config.TOPOLOGY_MAX_TASK_PARALLELISM, val);
- }
-
- @Override
- public T setMaxSpoutPending(Number val) {
- if (val != null)
- val = val.intValue();
- return addConfiguration(Config.TOPOLOGY_MAX_SPOUT_PENDING, val);
- }
-
- @Override
- public T setNumTasks(Number val) {
- if (val != null)
- val = val.intValue();
- return addConfiguration(Config.TOPOLOGY_TASKS, val);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java b/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
deleted file mode 100644
index 586f17c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BasicBoltExecutor implements IRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
-
- private IBasicBolt _bolt;
- private transient BasicOutputCollector _collector;
-
- public BasicBoltExecutor(IBasicBolt bolt) {
- _bolt = bolt;
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _bolt.declareOutputFields(declarer);
- }
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- _bolt.prepare(stormConf, context);
- _collector = new BasicOutputCollector(collector);
- }
-
- public void execute(Tuple input) {
- _collector.setContext(input);
- try {
- _bolt.execute(input, _collector);
- _collector.getOutputter().ack(input);
- } catch (FailedException e) {
- if (e instanceof ReportedFailedException) {
- _collector.reportError(e);
- }
- _collector.getOutputter().fail(input);
- }
- }
-
- public void cleanup() {
- _bolt.cleanup();
- }
-
- public Map<String, Object> getComponentConfiguration() {
- return _bolt.getComponentConfiguration();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java
deleted file mode 100644
index a0a892f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-public class BasicOutputCollector implements IBasicOutputCollector {
- private OutputCollector out;
- private Tuple inputTuple;
-
- public BasicOutputCollector(OutputCollector out) {
- this.out = out;
- }
-
- public List<Integer> emit(String streamId, List<Object> tuple) {
- return out.emit(streamId, inputTuple, tuple);
- }
-
- public List<Integer> emit(List<Object> tuple) {
- return emit(Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- public void setContext(Tuple inputTuple) {
- this.inputTuple = inputTuple;
- }
-
- public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- out.emitDirect(taskId, streamId, inputTuple, tuple);
- }
-
- public void emitDirect(int taskId, List<Object> tuple) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- protected IOutputCollector getOutputter() {
- return out;
- }
-
- public void reportError(Throwable t) {
- out.reportError(t);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java
deleted file mode 100644
index 0e9cd58..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package backtype.storm.topology;
-
-public interface BoltDeclarer extends InputDeclarer<BoltDeclarer>,
- ComponentConfigurationDeclarer<BoltDeclarer> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
deleted file mode 100644
index 3c59980..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.topology;
-
-import java.util.Map;
-
-public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> {
- T addConfigurations(Map conf);
-
- T addConfiguration(String config, Object value);
-
- T setDebug(boolean debug);
-
- T setMaxTaskParallelism(Number val);
-
- T setMaxSpoutPending(Number val);
-
- @Deprecated
- T setNumTasks(Number val);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java b/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java
deleted file mode 100644
index 793f53e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package backtype.storm.topology;
-
-public class FailedException extends RuntimeException {
- public FailedException() {
- super();
- }
-
- public FailedException(String msg) {
- super(msg);
- }
-
- public FailedException(String msg, Throwable cause) {
- super(msg, cause);
- }
-
- public FailedException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java
deleted file mode 100644
index e5f303e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-
-public interface IBasicBolt extends IComponent {
- void prepare(Map stormConf, TopologyContext context);
-
- /**
- * Process the input tuple and optionally emit new tuples based on the input
- * tuple.
- *
- * All acking is managed for you. Throw a FailedException if you want to
- * fail the tuple.
- */
- void execute(Tuple input, BasicOutputCollector collector);
-
- void cleanup();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
deleted file mode 100644
index 44c0fe1..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.topology;
-
-import java.util.List;
-
-public interface IBasicOutputCollector {
- List<Integer> emit(String streamId, List<Object> tuple);
-
- void emitDirect(int taskId, String streamId, List<Object> tuple);
-
- void reportError(Throwable t);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java b/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java
deleted file mode 100644
index 88f1450..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package backtype.storm.topology;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Common methods for all possible components in a topology. This interface is
- * used when defining topologies using the Java API.
- */
-public interface IComponent extends Serializable {
-
- /**
- * Declare the output schema for all the streams of this topology.
- *
- * @param declarer
- * this is used to declare output stream ids, output fields, and
- * whether or not each output stream is a direct stream
- */
- void declareOutputFields(OutputFieldsDeclarer declarer);
-
- /**
- * Declare configuration specific to this component. Only a subset of the
- * "topology.*" configs can be overridden. The component configuration can
- * be further overridden when constructing the topology using
- * {@link TopologyBuilder}
- *
- */
- Map<String, Object> getComponentConfiguration();
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java
deleted file mode 100644
index b79126c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.IBolt;
-
-/**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout}
- * are the main interfaces to use to implement components of the topology.
- *
- */
-public interface IRichBolt extends IBolt, IComponent {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java
deleted file mode 100644
index 6953f66..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.spout.ISpout;
-
-/**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout}
- * are the main interfaces to use to implement components of the topology.
- *
- */
-public interface IRichSpout extends ISpout, IComponent {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java
deleted file mode 100644
index ffc2ec2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.state.IStateSpout;
-
-public interface IRichStateSpout extends IStateSpout, IComponent {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java
deleted file mode 100644
index b97daca..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.tuple.Fields;
-
-public interface InputDeclarer<T extends InputDeclarer> {
- public T fieldsGrouping(String componentId, Fields fields);
-
- public T fieldsGrouping(String componentId, String streamId, Fields fields);
-
- public T globalGrouping(String componentId);
-
- public T globalGrouping(String componentId, String streamId);
-
- public T shuffleGrouping(String componentId);
-
- public T shuffleGrouping(String componentId, String streamId);
-
- public T localOrShuffleGrouping(String componentId);
-
- public T localOrShuffleGrouping(String componentId, String streamId);
-
- public T localFirstGrouping(String componentId);
-
- public T localFirstGrouping(String componentId, String streamId);
-
- public T noneGrouping(String componentId);
-
- public T noneGrouping(String componentId, String streamId);
-
- public T allGrouping(String componentId);
-
- public T allGrouping(String componentId, String streamId);
-
- public T directGrouping(String componentId);
-
- public T directGrouping(String componentId, String streamId);
-
- public T customGrouping(String componentId, CustomStreamGrouping grouping);
-
- public T customGrouping(String componentId, String streamId,
- CustomStreamGrouping grouping);
-
- public T grouping(GlobalStreamId id, Grouping grouping);
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
deleted file mode 100644
index a981cc6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.tuple.Fields;
-
-public interface OutputFieldsDeclarer {
- /**
- * Uses default stream id.
- */
- public void declare(Fields fields);
-
- public void declare(boolean direct, Fields fields);
-
- public void declareStream(String streamId, Fields fields);
-
- public void declareStream(String streamId, boolean direct, Fields fields);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java b/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
deleted file mode 100644
index 1a6c0c2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-
-public class OutputFieldsGetter implements OutputFieldsDeclarer {
- private Map<String, StreamInfo> _fields = new HashMap<String, StreamInfo>();
-
- public void declare(Fields fields) {
- declare(false, fields);
- }
-
- public void declare(boolean direct, Fields fields) {
- declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
- }
-
- public void declareStream(String streamId, Fields fields) {
- declareStream(streamId, false, fields);
- }
-
- public void declareStream(String streamId, boolean direct, Fields fields) {
- if (_fields.containsKey(streamId)) {
- throw new IllegalArgumentException("Fields for " + streamId
- + " already set");
- }
- _fields.put(streamId, new StreamInfo(fields.toList(), direct));
- }
-
- public Map<String, StreamInfo> getFieldsDeclaration() {
- return _fields;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java b/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java
deleted file mode 100644
index 74bd0de..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package backtype.storm.topology;
-
-public class ReportedFailedException extends FailedException {
- public ReportedFailedException() {
- super();
- }
-
- public ReportedFailedException(String msg) {
- super(msg);
- }
-
- public ReportedFailedException(String msg, Throwable cause) {
- super(msg, cause);
- }
-
- public ReportedFailedException(Throwable cause) {
- super(cause);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java
deleted file mode 100644
index b14d496..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package backtype.storm.topology;
-
-public interface SpoutDeclarer extends
- ComponentConfigurationDeclarer<SpoutDeclarer> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java b/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java
deleted file mode 100644
index 2b2f55b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java
+++ /dev/null
@@ -1,424 +0,0 @@
-package backtype.storm.topology;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.NullStruct;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * TopologyBuilder exposes the Java API for specifying a topology for Storm to
- * execute. Topologies are Thrift structures in the end, but since the Thrift
- * API is so verbose, TopologyBuilder greatly eases the process of creating
- * topologies. The template for creating and submitting a topology looks
- * something like:
- *
- * <pre>
- * TopologyBuilder builder = new TopologyBuilder();
- *
- * builder.setSpout("1", new TestWordSpout(true), 5);
- * builder.setSpout("2", new TestWordSpout(true), 3);
- * builder.setBolt("3", new TestWordCounter(), 3)
- * .fieldsGrouping("1", new Fields("word"))
- * .fieldsGrouping("2", new Fields("word"));
- * builder.setBolt("4", new TestGlobalCount()).globalGrouping("1");
- *
- * Map conf = new HashMap();
- * conf.put(Config.TOPOLOGY_WORKERS, 4);
- *
- * StormSubmitter.submitTopology("mytopology", conf, builder.createTopology());
- * </pre>
- *
- * Running the exact same topology in local mode (in process), and configuring
- * it to log all tuples emitted, looks like the following. Note that it lets the
- * topology run for 10 seconds before shutting down the local cluster.
- *
- * <pre>
- * TopologyBuilder builder = new TopologyBuilder();
- *
- * builder.setSpout("1", new TestWordSpout(true), 5);
- * builder.setSpout("2", new TestWordSpout(true), 3);
- * builder.setBolt("3", new TestWordCounter(), 3)
- * .fieldsGrouping("1", new Fields("word"))
- * .fieldsGrouping("2", new Fields("word"));
- * builder.setBolt("4", new TestGlobalCount()).globalGrouping("1");
- *
- * Map conf = new HashMap();
- * conf.put(Config.TOPOLOGY_WORKERS, 4);
- * conf.put(Config.TOPOLOGY_DEBUG, true);
- *
- * LocalCluster cluster = new LocalCluster();
- * cluster.submitTopology("mytopology", conf, builder.createTopology());
- * Utils.sleep(10000);
- * cluster.shutdown();
- * </pre>
- *
- * <p>
- * The pattern for TopologyBuilder is to map component ids to components using
- * the setSpout and setBolt methods. Those methods return objects that are then
- * used to declare the inputs for that component.
- * </p>
- */
-public class TopologyBuilder {
- private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
- private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
- private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
-
- // private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new
- // HashMap<String, Map<GlobalStreamId, Grouping>>();
-
- private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
-
- public StormTopology createTopology() {
- Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
- Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
- for (String boltId : _bolts.keySet()) {
- IRichBolt bolt = _bolts.get(boltId);
- ComponentCommon common = getComponentCommon(boltId, bolt);
- boltSpecs.put(
- boltId,
- new Bolt(ComponentObject.serialized_java(Utils
- .serialize(bolt)), common));
- }
- for (String spoutId : _spouts.keySet()) {
- IRichSpout spout = _spouts.get(spoutId);
- ComponentCommon common = getComponentCommon(spoutId, spout);
- spoutSpecs.put(
- spoutId,
- new SpoutSpec(ComponentObject.serialized_java(Utils
- .serialize(spout)), common));
-
- }
- return new StormTopology(spoutSpecs, boltSpecs,
- new HashMap<String, StateSpoutSpec>());
- }
-
- /**
- * Define a new bolt in this topology with parallelism of just one thread.
- *
- * @param id
- * the id of this component. This id is referenced by other
- * components that want to consume this bolt's outputs.
- * @param bolt
- * the bolt
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(String id, IRichBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- /**
- * Define a new bolt in this topology with the specified amount of
- * parallelism.
- *
- * @param id
- * the id of this component. This id is referenced by other
- * components that want to consume this bolt's outputs.
- * @param bolt
- * the bolt
- * @param parallelism_hint
- * the number of tasks that should be assigned to execute this
- * bolt. Each task will run on a thread in a process somewhere
- * around the cluster.
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(String id, IRichBolt bolt,
- Number parallelism_hint) {
- validateUnusedId(id);
- initCommon(id, bolt, parallelism_hint);
- _bolts.put(id, bolt);
- return new BoltGetter(id);
- }
-
- /**
- * Define a new bolt in this topology. This defines a basic bolt, which is a
- * simpler to use but more restricted kind of bolt. Basic bolts are intended
- * for non-aggregation processing and automate the anchoring/acking process
- * to achieve proper reliability in the topology.
- *
- * @param id
- * the id of this component. This id is referenced by other
- * components that want to consume this bolt's outputs.
- * @param bolt
- * the basic bolt
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- /**
- * Define a new bolt in this topology. This defines a basic bolt, which is a
- * simpler to use but more restricted kind of bolt. Basic bolts are intended
- * for non-aggregation processing and automate the anchoring/acking process
- * to achieve proper reliability in the topology.
- *
- * @param id
- * the id of this component. This id is referenced by other
- * components that want to consume this bolt's outputs.
- * @param bolt
- * the basic bolt
- * @param parallelism_hint
- * the number of tasks that should be assigned to execute this
- * bolt. Each task will run on a thread in a process somwehere
- * around the cluster.
- * @return use the returned object to declare the inputs to this component
- */
- public BoltDeclarer setBolt(String id, IBasicBolt bolt,
- Number parallelism_hint) {
- return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
- }
-
- /**
- * Define a new spout in this topology.
- *
- * @param id
- * the id of this component. This id is referenced by other
- * components that want to consume this spout's outputs.
- * @param spout
- * the spout
- */
- public SpoutDeclarer setSpout(String id, IRichSpout spout) {
- return setSpout(id, spout, null);
- }
-
- /**
- * Define a new spout in this topology with the specified parallelism. If
- * the spout declares itself as non-distributed, the parallelism_hint will
- * be ignored and only one task will be allocated to this component.
- *
- * @param id
- * the id of this component. This id is referenced by other
- * components that want to consume this spout's outputs.
- * @param parallelism_hint
- * the number of tasks that should be assigned to execute this
- * spout. Each task will run on a thread in a process somwehere
- * around the cluster.
- * @param spout
- * the spout
- */
- public SpoutDeclarer setSpout(String id, IRichSpout spout,
- Number parallelism_hint) {
- validateUnusedId(id);
- initCommon(id, spout, parallelism_hint);
- _spouts.put(id, spout);
- return new SpoutGetter(id);
- }
-
- public void setStateSpout(String id, IRichStateSpout stateSpout) {
- setStateSpout(id, stateSpout, null);
- }
-
- public void setStateSpout(String id, IRichStateSpout stateSpout,
- Number parallelism_hint) {
- validateUnusedId(id);
- // TODO: finish
- }
-
- private void validateUnusedId(String id) {
- if (_bolts.containsKey(id)) {
- throw new IllegalArgumentException(
- "Bolt has already been declared for id " + id);
- }
- if (_spouts.containsKey(id)) {
- throw new IllegalArgumentException(
- "Spout has already been declared for id " + id);
- }
- if (_stateSpouts.containsKey(id)) {
- throw new IllegalArgumentException(
- "State spout has already been declared for id " + id);
- }
- }
-
- private ComponentCommon getComponentCommon(String id, IComponent component) {
- ComponentCommon ret = new ComponentCommon(_commons.get(id));
-
- OutputFieldsGetter getter = new OutputFieldsGetter();
- component.declareOutputFields(getter);
- ret.set_streams(getter.getFieldsDeclaration());
- return ret;
- }
-
- private void initCommon(String id, IComponent component, Number parallelism) {
- ComponentCommon common = new ComponentCommon();
- common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
- if (parallelism != null)
- common.set_parallelism_hint(parallelism.intValue());
- else {
- common.set_parallelism_hint(Integer.valueOf(1));
- }
- Map conf = component.getComponentConfiguration();
- if (conf != null)
- common.set_json_conf(Utils.to_json(conf));
- _commons.put(id, common);
- }
-
- protected class ConfigGetter<T extends ComponentConfigurationDeclarer>
- extends BaseConfigurationDeclarer<T> {
- String _id;
-
- public ConfigGetter(String id) {
- _id = id;
- }
-
- @Override
- public T addConfigurations(Map conf) {
- if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
- throw new IllegalArgumentException(
- "Cannot set serializations for a component using fluent API");
- }
- String currConf = _commons.get(_id).get_json_conf();
- _commons.get(_id).set_json_conf(
- mergeIntoJson(parseJson(currConf), conf));
- return (T) this;
- }
- }
-
- protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements
- SpoutDeclarer {
- public SpoutGetter(String id) {
- super(id);
- }
- }
-
- protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements
- BoltDeclarer {
- private String _boltId;
-
- public BoltGetter(String boltId) {
- super(boltId);
- _boltId = boltId;
- }
-
- public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {
- return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);
- }
-
- public BoltDeclarer fieldsGrouping(String componentId, String streamId,
- Fields fields) {
- return grouping(componentId, streamId,
- Grouping.fields(fields.toList()));
- }
-
- public BoltDeclarer globalGrouping(String componentId) {
- return globalGrouping(componentId, Utils.DEFAULT_STREAM_ID);
- }
-
- public BoltDeclarer globalGrouping(String componentId, String streamId) {
- return grouping(componentId, streamId,
- Grouping.fields(new ArrayList<String>()));
- }
-
- public BoltDeclarer shuffleGrouping(String componentId) {
- return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
- }
-
- public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
- return grouping(componentId, streamId,
- Grouping.shuffle(new NullStruct()));
- }
-
- public BoltDeclarer localOrShuffleGrouping(String componentId) {
- return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
- }
-
- public BoltDeclarer localOrShuffleGrouping(String componentId,
- String streamId) {
- return grouping(componentId, streamId,
- Grouping.local_or_shuffle(new NullStruct()));
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(String componentId) {
- return localFirstGrouping(componentId, Utils.DEFAULT_STREAM_ID);
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(String componentId,
- String streamId) {
- return grouping(componentId, streamId,
- Grouping.localFirst(new NullStruct()));
- }
-
- public BoltDeclarer noneGrouping(String componentId) {
- return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
- }
-
- public BoltDeclarer noneGrouping(String componentId, String streamId) {
- return grouping(componentId, streamId,
- Grouping.none(new NullStruct()));
- }
-
- public BoltDeclarer allGrouping(String componentId) {
- return allGrouping(componentId, Utils.DEFAULT_STREAM_ID);
- }
-
- public BoltDeclarer allGrouping(String componentId, String streamId) {
- return grouping(componentId, streamId,
- Grouping.all(new NullStruct()));
- }
-
- public BoltDeclarer directGrouping(String componentId) {
- return directGrouping(componentId, Utils.DEFAULT_STREAM_ID);
- }
-
- public BoltDeclarer directGrouping(String componentId, String streamId) {
- return grouping(componentId, streamId,
- Grouping.direct(new NullStruct()));
- }
-
- private BoltDeclarer grouping(String componentId, String streamId,
- Grouping grouping) {
- _commons.get(_boltId).put_to_inputs(
- new GlobalStreamId(componentId, streamId), grouping);
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(String componentId,
- CustomStreamGrouping grouping) {
- return customGrouping(componentId, Utils.DEFAULT_STREAM_ID,
- grouping);
- }
-
- @Override
- public BoltDeclarer customGrouping(String componentId, String streamId,
- CustomStreamGrouping grouping) {
- return grouping(componentId, streamId,
- Grouping.custom_serialized(Utils.serialize(grouping)));
- }
-
- @Override
- public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) {
- return grouping(id.get_componentId(), id.get_streamId(), grouping);
- }
-
-
- }
-
- private static Map parseJson(String json) {
- if (json == null)
- return new HashMap();
- else
- return (Map) Utils.from_json(json);
- }
-
- private static String mergeIntoJson(Map into, Map newMap) {
- Map res = new HashMap(into);
- if (newMap != null)
- res.putAll(newMap);
- return Utils.to_json(res);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
deleted file mode 100644
index 2e18207..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IBasicBolt;
-import java.util.Map;
-
-public abstract class BaseBasicBolt extends BaseComponent implements IBasicBolt {
-
- @Override
- public void prepare(Map stormConf, TopologyContext context) {
- }
-
- @Override
- public void cleanup() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
deleted file mode 100644
index 9171392..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.coordination.IBatchBolt;
-import java.util.Map;
-
-public abstract class BaseBatchBolt<T> extends BaseComponent implements
- IBatchBolt<T> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java
deleted file mode 100644
index 1b1449a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.topology.IComponent;
-import java.util.Map;
-
-public abstract class BaseComponent implements IComponent {
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
deleted file mode 100644
index baf0c65..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-
-public abstract class BaseOpaquePartitionedTransactionalSpout<T> extends
- BaseComponent implements IOpaquePartitionedTransactionalSpout<T> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java
deleted file mode 100644
index 2a9f298..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import java.util.Map;
-
-public abstract class BasePartitionedTransactionalSpout<T> extends
- BaseComponent implements IPartitionedTransactionalSpout<T> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
deleted file mode 100644
index e6a7592..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.topology.IRichBolt;
-
-public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
- @Override
- public void cleanup() {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
deleted file mode 100644
index 9c0d733..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package backtype.storm.topology.base;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- *
- * @author nathan
- */
-public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
- @Override
- public void close() {
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void deactivate() {
- }
-
- @Override
- public void ack(Object msgId) {
- }
-
- @Override
- public void fail(Object msgId) {
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
deleted file mode 100644
index 77647c0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.TransactionAttempt;
-
-public abstract class BaseTransactionalBolt extends
- BaseBatchBolt<TransactionAttempt> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java
deleted file mode 100644
index 01945ea..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.ITransactionalSpout;
-import java.util.Map;
-
-public abstract class BaseTransactionalSpout<T> extends BaseComponent implements
- ITransactionalSpout<T> {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java b/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java
deleted file mode 100644
index e764fb7..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.transactional;
-
-/**
- * This marks an IBatchBolt within a transactional topology as a committer. This
- * causes the finishBatch method to be called in order of the transactions.
- */
-public interface ICommitter {
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
deleted file mode 100644
index 58b8e19..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-public interface ICommitterTransactionalSpout<X> extends ITransactionalSpout<X> {
- public interface Emitter extends ITransactionalSpout.Emitter {
- void commit(TransactionAttempt attempt);
- }
-
- @Override
- public Emitter getEmitter(Map conf, TopologyContext context);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
deleted file mode 100644
index 13399e1..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import java.math.BigInteger;
-import java.util.Map;
-
-public interface ITransactionalSpout<T> extends IComponent {
- public interface Coordinator<X> {
- /**
- * Create metadata for this particular transaction id which has never
- * been emitted before. The metadata should contain whatever is
- * necessary to be able to replay the exact batch for the transaction at
- * a later point.
- *
- * The metadata is stored in Zookeeper.
- *
- * Storm uses the Kryo serializations configured in the component
- * configuration for this spout to serialize and deserialize the
- * metadata.
- *
- * @param txid
- * The id of the transaction.
- * @param prevMetadata
- * The metadata of the previous transaction
- * @return the metadata for this new transaction
- */
- X initializeTransaction(BigInteger txid, X prevMetadata);
-
- /**
- * Returns true if its ok to emit start a new transaction, false
- * otherwise (will skip this transaction).
- *
- * You should sleep here if you want a delay between asking for the next
- * transaction (this will be called repeatedly in a loop).
- */
- boolean isReady();
-
- /**
- * Release any resources from this coordinator.
- */
- void close();
- }
-
- public interface Emitter<X> {
- /**
- * Emit a batch for the specified transaction attempt and metadata for
- * the transaction. The metadata was created by the Coordinator in the
- * initializeTranaction method. This method must always emit the same
- * batch of tuples across all tasks for the same transaction id.
- *
- * The first field of all emitted tuples must contain the provided
- * TransactionAttempt.
- *
- */
- void emitBatch(TransactionAttempt tx, X coordinatorMeta,
- BatchOutputCollector collector);
-
- /**
- * Any state for transactions prior to the provided transaction id can
- * be safely cleaned up, so this method should clean up that state.
- */
- void cleanupBefore(BigInteger txid);
-
- /**
- * Release any resources held by this emitter.
- */
- void close();
- }
-
- /**
- * The coordinator for a TransactionalSpout runs in a single thread and
- * indicates when batches of tuples should be emitted and when transactions
- * should commit. The Coordinator that you provide in a TransactionalSpout
- * provides metadata for each transaction so that the transactions can be
- * replayed.
- */
- Coordinator<T> getCoordinator(Map conf, TopologyContext context);
-
- /**
- * The emitter for a TransactionalSpout runs as many tasks across the
- * cluster. Emitters are responsible for emitting batches of tuples for a
- * transaction and must ensure that the same batch of tuples is always
- * emitted for the same transaction id.
- */
- Emitter<T> getEmitter(Map conf, TopologyContext context);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java
deleted file mode 100644
index 2d02de1..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.transactional;
-
-import java.math.BigInteger;
-
-public class TransactionAttempt {
- BigInteger _txid;
- long _attemptId;
-
- // for kryo compatibility
- public TransactionAttempt() {
-
- }
-
- public TransactionAttempt(BigInteger txid, long attemptId) {
- _txid = txid;
- _attemptId = attemptId;
- }
-
- public BigInteger getTransactionId() {
- return _txid;
- }
-
- public long getAttemptId() {
- return _attemptId;
- }
-
- @Override
- public int hashCode() {
- return _txid.hashCode();
- }
-
- @Override
- public boolean equals(Object o) {
- if (!(o instanceof TransactionAttempt))
- return false;
- TransactionAttempt other = (TransactionAttempt) o;
- return _txid.equals(other._txid) && _attemptId == other._attemptId;
- }
-
- @Override
- public String toString() {
- return "" + _txid + ":" + _attemptId;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
deleted file mode 100644
index daea107..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.coordination.BatchOutputCollectorImpl;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.TreeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransactionalSpoutBatchExecutor implements IRichBolt {
- public static Logger LOG = LoggerFactory
- .getLogger(TransactionalSpoutBatchExecutor.class);
-
- BatchOutputCollectorImpl _collector;
- ITransactionalSpout _spout;
- ITransactionalSpout.Emitter _emitter;
-
- TreeMap<BigInteger, TransactionAttempt> _activeTransactions = new TreeMap<BigInteger, TransactionAttempt>();
-
- public TransactionalSpoutBatchExecutor(ITransactionalSpout spout) {
- _spout = spout;
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- OutputCollector collector) {
- _collector = new BatchOutputCollectorImpl(collector);
- _emitter = _spout.getEmitter(conf, context);
- }
-
- @Override
- public void execute(Tuple input) {
- TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
- try {
- if (input.getSourceStreamId().equals(
- TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
- if (attempt.equals(_activeTransactions.get(attempt
- .getTransactionId()))) {
- ((ICommitterTransactionalSpout.Emitter) _emitter)
- .commit(attempt);
- _activeTransactions.remove(attempt.getTransactionId());
- _collector.ack(input);
- } else {
- _collector.fail(input);
- }
- } else {
- _emitter.emitBatch(attempt, input.getValue(1), _collector);
- _activeTransactions.put(attempt.getTransactionId(), attempt);
- _collector.ack(input);
- BigInteger committed = (BigInteger) input.getValue(2);
- if (committed != null) {
- // valid to delete before what's been committed since
- // those batches will never be accessed again
- _activeTransactions.headMap(committed).clear();
- _emitter.cleanupBefore(committed);
- }
- }
- } catch (FailedException e) {
- LOG.warn("Failed to emit batch for transaction", e);
- _collector.fail(input);
- }
- }
-
- @Override
- public void cleanup() {
- _emitter.close();
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _spout.declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return _spout.getComponentConfiguration();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
deleted file mode 100644
index 4810903..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
+++ /dev/null
@@ -1,220 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.transactional.state.RotatingTransactionalState;
-import backtype.storm.transactional.state.TransactionalState;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransactionalSpoutCoordinator extends BaseRichSpout {
- public static final Logger LOG = LoggerFactory
- .getLogger(TransactionalSpoutCoordinator.class);
-
- public static final BigInteger INIT_TXID = BigInteger.ONE;
-
- public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class
- .getName() + "/batch";
- public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class
- .getName() + "/commit";
-
- private static final String CURRENT_TX = "currtx";
- private static final String META_DIR = "meta";
-
- private ITransactionalSpout _spout;
- private ITransactionalSpout.Coordinator _coordinator;
- private TransactionalState _state;
- private RotatingTransactionalState _coordinatorState;
-
- TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>();
-
- private SpoutOutputCollector _collector;
- private Random _rand;
- BigInteger _currTransaction;
- int _maxTransactionActive;
- StateInitializer _initializer;
-
- public TransactionalSpoutCoordinator(ITransactionalSpout spout) {
- _spout = spout;
- }
-
- public ITransactionalSpout getSpout() {
- return _spout;
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- _rand = new Random(Utils.secureRandomLong());
- _state = TransactionalState.newCoordinatorState(conf,
- (String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID),
- _spout.getComponentConfiguration());
- _coordinatorState = new RotatingTransactionalState(_state, META_DIR,
- true);
- _collector = collector;
- _coordinator = _spout.getCoordinator(conf, context);
- _currTransaction = getStoredCurrTransaction(_state);
- Object active = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
- if (active == null) {
- _maxTransactionActive = 1;
- } else {
- _maxTransactionActive = Utils.getInt(active);
- }
- _initializer = new StateInitializer();
- }
-
- @Override
- public void close() {
- _state.close();
- }
-
- @Override
- public void nextTuple() {
- sync();
- }
-
- @Override
- public void ack(Object msgId) {
- TransactionAttempt tx = (TransactionAttempt) msgId;
- TransactionStatus status = _activeTx.get(tx.getTransactionId());
- if (status != null && tx.equals(status.attempt)) {
- if (status.status == AttemptStatus.PROCESSING) {
- status.status = AttemptStatus.PROCESSED;
- } else if (status.status == AttemptStatus.COMMITTING) {
- _activeTx.remove(tx.getTransactionId());
- _coordinatorState.cleanupBefore(tx.getTransactionId());
- _currTransaction = nextTransactionId(tx.getTransactionId());
- _state.setData(CURRENT_TX, _currTransaction);
- }
- sync();
- }
- }
-
- @Override
- public void fail(Object msgId) {
- TransactionAttempt tx = (TransactionAttempt) msgId;
- TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
- if (stored != null && tx.equals(stored.attempt)) {
- _activeTx.tailMap(tx.getTransactionId()).clear();
- sync();
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- // in partitioned example, in case an emitter task receives a later
- // transaction than it's emitted so far,
- // when it sees the earlier txid it should know to emit nothing
- declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx",
- "tx-meta", "committed-txid"));
- declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
- }
-
- private void sync() {
- // note that sometimes the tuples active may be less than
- // max_spout_pending, e.g.
- // max_spout_pending = 3
- // tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2
- // (because tx 1 isn't committed yet),
- // and there won't be a batch for tx 4 because there's max_spout_pending
- // tx active
- TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
- if (maybeCommit != null
- && maybeCommit.status == AttemptStatus.PROCESSED) {
- maybeCommit.status = AttemptStatus.COMMITTING;
- _collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(
- maybeCommit.attempt), maybeCommit.attempt);
- }
-
- try {
- if (_activeTx.size() < _maxTransactionActive) {
- BigInteger curr = _currTransaction;
- for (int i = 0; i < _maxTransactionActive; i++) {
- if ((_coordinatorState.hasCache(curr) || _coordinator
- .isReady()) && !_activeTx.containsKey(curr)) {
- TransactionAttempt attempt = new TransactionAttempt(
- curr, _rand.nextLong());
- Object state = _coordinatorState.getState(curr,
- _initializer);
- _activeTx.put(curr, new TransactionStatus(attempt));
- _collector
- .emit(TRANSACTION_BATCH_STREAM_ID,
- new Values(
- attempt,
- state,
- previousTransactionId(_currTransaction)),
- attempt);
- }
- curr = nextTransactionId(curr);
- }
- }
- } catch (FailedException e) {
- LOG.warn("Failed to get metadata for a transaction", e);
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- Config ret = new Config();
- ret.setMaxTaskParallelism(1);
- return ret;
- }
-
- private static enum AttemptStatus {
- PROCESSING, PROCESSED, COMMITTING
- }
-
- private static class TransactionStatus {
- TransactionAttempt attempt;
- AttemptStatus status;
-
- public TransactionStatus(TransactionAttempt attempt) {
- this.attempt = attempt;
- this.status = AttemptStatus.PROCESSING;
- }
-
- @Override
- public String toString() {
- return attempt.toString() + " <" + status.toString() + ">";
- }
- }
-
- private BigInteger nextTransactionId(BigInteger id) {
- return id.add(BigInteger.ONE);
- }
-
- private BigInteger previousTransactionId(BigInteger id) {
- if (id.equals(INIT_TXID)) {
- return null;
- } else {
- return id.subtract(BigInteger.ONE);
- }
- }
-
- private BigInteger getStoredCurrTransaction(TransactionalState state) {
- BigInteger ret = (BigInteger) state.getData(CURRENT_TX);
- if (ret == null)
- return INIT_TXID;
- else
- return ret;
- }
-
- private class StateInitializer implements
- RotatingTransactionalState.StateInitializer {
- @Override
- public Object init(BigInteger txid, Object lastState) {
- return _coordinator.initializeTransaction(txid, lastState);
- }
- }
-}