You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:04 UTC

[25/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java b/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java
deleted file mode 100644
index 35a2592..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestJob.java
+++ /dev/null
@@ -1,24 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.ILocalCluster;
-
-/**
- * This is the core interface for the storm java testing, usually we put our
- * java unit testing logic in the run method. A sample code will be: <code>
- * Testing.withSimulatedTimeLocalCluster(new TestJob() {
- *     public void run(Cluster cluster) {
- *         // your testing logic here.
- *     }
- * });
- */
-public interface TestJob {
-	/**
-	 * run the testing logic with the cluster.
-	 * 
-	 * @param cluster
-	 *            the cluster which created by
-	 *            <code>Testing.withSimulatedTimeLocalCluster</code> and
-	 *            <code>Testing.withTrackedCluster</code>.
-	 */
-	public void run(ILocalCluster cluster) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java b/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java
deleted file mode 100644
index fd9053c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestKryoDecorator.java
+++ /dev/null
@@ -1,14 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.serialization.IKryoDecorator;
-import com.esotericsoftware.kryo.Kryo;
-import com.esotericsoftware.kryo.Serializer;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-
-public class TestKryoDecorator implements IKryoDecorator {
-
-	public void decorate(Kryo k) {
-		k.register(TestSerObject.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java
deleted file mode 100644
index 69e175e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerBolt.java
+++ /dev/null
@@ -1,28 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichBolt;
-
-public class TestPlannerBolt extends BaseRichBolt {
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-
-	}
-
-	public void execute(Tuple input) {
-
-	}
-
-	public Fields getOutputFields() {
-		return new Fields("field1", "field2");
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(getOutputFields());
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java
deleted file mode 100644
index c00dc45..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestPlannerSpout.java
+++ /dev/null
@@ -1,67 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.Map;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-
-public class TestPlannerSpout extends BaseRichSpout {
-	boolean _isDistributed;
-	Fields _outFields;
-
-	public TestPlannerSpout(Fields outFields, boolean isDistributed) {
-		_isDistributed = isDistributed;
-		_outFields = outFields;
-	}
-
-	public TestPlannerSpout(boolean isDistributed) {
-		this(new Fields("field1", "field2"), isDistributed);
-	}
-
-	public TestPlannerSpout(Fields outFields) {
-		this(outFields, true);
-	}
-
-	public Fields getOutputFields() {
-		return _outFields;
-	}
-
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-
-	}
-
-	public void close() {
-
-	}
-
-	public void nextTuple() {
-		Utils.sleep(100);
-	}
-
-	public void ack(Object msgId) {
-
-	}
-
-	public void fail(Object msgId) {
-
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(getOutputFields());
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		Map<String, Object> ret = new HashMap<String, Object>();
-		if (!_isDistributed) {
-			ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
-		}
-		return ret;
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java b/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java
deleted file mode 100644
index ab356a3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestSerObject.java
+++ /dev/null
@@ -1,40 +0,0 @@
-package backtype.storm.testing;
-
-import java.io.Serializable;
-
-public class TestSerObject implements Serializable {
-	public int f1;
-	public int f2;
-
-	public TestSerObject(int f1, int f2) {
-		this.f1 = f1;
-		this.f2 = f2;
-	}
-
-	@Override
-	public int hashCode() {
-		final int prime = 31;
-		int result = 1;
-		result = prime * result + f1;
-		result = prime * result + f2;
-		return result;
-	}
-
-	@Override
-	public boolean equals(Object obj) {
-		if (this == obj)
-			return true;
-		if (obj == null)
-			return false;
-		if (getClass() != obj.getClass())
-			return false;
-		TestSerObject other = (TestSerObject) obj;
-		if (f1 != other.f1)
-			return false;
-		if (f2 != other.f2)
-			return false;
-		return true;
-	}
-
-	
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java b/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java
deleted file mode 100644
index 3572b9d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestWordCounter.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.topology.base.BaseBasicBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicOutputCollector;
-import java.util.HashMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.tuple;
-
-public class TestWordCounter extends BaseBasicBolt {
-	public static Logger LOG = LoggerFactory.getLogger(TestWordCounter.class);
-
-	Map<String, Integer> _counts;
-
-	public void prepare(Map stormConf, TopologyContext context) {
-		_counts = new HashMap<String, Integer>();
-	}
-
-	public void execute(Tuple input, BasicOutputCollector collector) {
-		String word = (String) input.getValues().get(0);
-		int count = 0;
-		if (_counts.containsKey(word)) {
-			count = _counts.get(word);
-		}
-		count++;
-		_counts.put(word, count);
-		collector.emit(tuple(word, count));
-	}
-
-	public void cleanup() {
-
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("word", "count"));
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java b/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java
deleted file mode 100644
index 432e5de..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TestWordSpout.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.Config;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import java.util.Map;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestWordSpout extends BaseRichSpout {
-	public static Logger LOG = LoggerFactory.getLogger(TestWordSpout.class);
-	boolean _isDistributed;
-	SpoutOutputCollector _collector;
-
-	public TestWordSpout() {
-		this(true);
-	}
-
-	public TestWordSpout(boolean isDistributed) {
-		_isDistributed = isDistributed;
-	}
-
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		_collector = collector;
-	}
-
-	public void close() {
-
-	}
-
-	public void nextTuple() {
-		Utils.sleep(100);
-		final String[] words = new String[] { "nathan", "mike", "jackson",
-				"golda", "bertels" };
-		final Random rand = new Random();
-		final String word = words[rand.nextInt(words.length)];
-		_collector.emit(new Values(word));
-	}
-
-	public void ack(Object msgId) {
-
-	}
-
-	public void fail(Object msgId) {
-
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("word"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		if (!_isDistributed) {
-			Map<String, Object> ret = new HashMap<String, Object>();
-			ret.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, 1);
-			return ret;
-		} else {
-			return null;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java b/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java
deleted file mode 100644
index dc98ba5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TrackedTopology.java
+++ /dev/null
@@ -1,17 +0,0 @@
-package backtype.storm.testing;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import backtype.storm.generated.StormTopology;
-import clojure.lang.Keyword;
-
-public class TrackedTopology extends HashMap {
-	public TrackedTopology(Map map) {
-		super(map);
-	}
-
-	public StormTopology getTopology() {
-		return (StormTopology) get(Keyword.intern("topology"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java b/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
deleted file mode 100644
index 33c07b7..0000000
--- a/jstorm-client/src/main/java/backtype/storm/testing/TupleCaptureBolt.java
+++ /dev/null
@@ -1,68 +0,0 @@
-package backtype.storm.testing;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-
-public class TupleCaptureBolt implements IRichBolt {
-	public static transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<String, Map<String, List<FixedTuple>>>();
-
-	private String _name;
-	private transient OutputCollector _collector;
-
-	public TupleCaptureBolt() {
-		_name = UUID.randomUUID().toString();
-		emitted_tuples.put(_name, new HashMap<String, List<FixedTuple>>());
-	}
-
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		_collector = collector;
-	}
-
-	public void execute(Tuple input) {
-		String component = input.getSourceComponent();
-		Map<String, List<FixedTuple>> captured = emitted_tuples.get(_name);
-		if (!captured.containsKey(component)) {
-			captured.put(component, new ArrayList<FixedTuple>());
-		}
-		captured.get(component).add(
-				new FixedTuple(input.getSourceStreamId(), input.getValues()));
-		_collector.ack(input);
-	}
-
-	public Map<String, List<FixedTuple>> getResults() {
-		return emitted_tuples.get(_name);
-	}
-
-	public void cleanup() {
-	}
-
-	public Map<String, List<FixedTuple>> getAndRemoveResults() {
-		return emitted_tuples.remove(_name);
-	}
-
-	public Map<String, List<FixedTuple>> getAndClearResults() {
-		Map<String, List<FixedTuple>> ret = new HashMap<String, List<FixedTuple>>(
-				emitted_tuples.get(_name));
-		emitted_tuples.get(_name).clear();
-		return ret;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
deleted file mode 100644
index fcf90b5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BaseConfigurationDeclarer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.Config;
-import java.util.HashMap;
-import java.util.Map;
-
-public abstract class BaseConfigurationDeclarer<T extends ComponentConfigurationDeclarer>
-		implements ComponentConfigurationDeclarer<T> {
-	@Override
-	public T addConfiguration(String config, Object value) {
-		Map configMap = new HashMap();
-		configMap.put(config, value);
-		return addConfigurations(configMap);
-	}
-
-	@Override
-	public T setDebug(boolean debug) {
-		return addConfiguration(Config.TOPOLOGY_DEBUG, debug);
-	}
-
-	@Override
-	public T setMaxTaskParallelism(Number val) {
-		if (val != null)
-			val = val.intValue();
-		return addConfiguration(Config.TOPOLOGY_MAX_TASK_PARALLELISM, val);
-	}
-
-	@Override
-	public T setMaxSpoutPending(Number val) {
-		if (val != null)
-			val = val.intValue();
-		return addConfiguration(Config.TOPOLOGY_MAX_SPOUT_PENDING, val);
-	}
-
-	@Override
-	public T setNumTasks(Number val) {
-		if (val != null)
-			val = val.intValue();
-		return addConfiguration(Config.TOPOLOGY_TASKS, val);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java b/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
deleted file mode 100644
index 586f17c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BasicBoltExecutor.java
+++ /dev/null
@@ -1,50 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BasicBoltExecutor implements IRichBolt {
-	public static Logger LOG = LoggerFactory.getLogger(BasicBoltExecutor.class);
-
-	private IBasicBolt _bolt;
-	private transient BasicOutputCollector _collector;
-
-	public BasicBoltExecutor(IBasicBolt bolt) {
-		_bolt = bolt;
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_bolt.declareOutputFields(declarer);
-	}
-
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		_bolt.prepare(stormConf, context);
-		_collector = new BasicOutputCollector(collector);
-	}
-
-	public void execute(Tuple input) {
-		_collector.setContext(input);
-		try {
-			_bolt.execute(input, _collector);
-			_collector.getOutputter().ack(input);
-		} catch (FailedException e) {
-			if (e instanceof ReportedFailedException) {
-				_collector.reportError(e);
-			}
-			_collector.getOutputter().fail(input);
-		}
-	}
-
-	public void cleanup() {
-		_bolt.cleanup();
-	}
-
-	public Map<String, Object> getComponentConfiguration() {
-		return _bolt.getComponentConfiguration();
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java
deleted file mode 100644
index a0a892f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BasicOutputCollector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-public class BasicOutputCollector implements IBasicOutputCollector {
-	private OutputCollector out;
-	private Tuple inputTuple;
-
-	public BasicOutputCollector(OutputCollector out) {
-		this.out = out;
-	}
-
-	public List<Integer> emit(String streamId, List<Object> tuple) {
-		return out.emit(streamId, inputTuple, tuple);
-	}
-
-	public List<Integer> emit(List<Object> tuple) {
-		return emit(Utils.DEFAULT_STREAM_ID, tuple);
-	}
-
-	public void setContext(Tuple inputTuple) {
-		this.inputTuple = inputTuple;
-	}
-
-	public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-		out.emitDirect(taskId, streamId, inputTuple, tuple);
-	}
-
-	public void emitDirect(int taskId, List<Object> tuple) {
-		emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
-	}
-
-	protected IOutputCollector getOutputter() {
-		return out;
-	}
-
-	public void reportError(Throwable t) {
-		out.reportError(t);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java
deleted file mode 100644
index 0e9cd58..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/BoltDeclarer.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package backtype.storm.topology;
-
-public interface BoltDeclarer extends InputDeclarer<BoltDeclarer>,
-		ComponentConfigurationDeclarer<BoltDeclarer> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
deleted file mode 100644
index 3c59980..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/ComponentConfigurationDeclarer.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.topology;
-
-import java.util.Map;
-
-public interface ComponentConfigurationDeclarer<T extends ComponentConfigurationDeclarer> {
-	T addConfigurations(Map conf);
-
-	T addConfiguration(String config, Object value);
-
-	T setDebug(boolean debug);
-
-	T setMaxTaskParallelism(Number val);
-
-	T setMaxSpoutPending(Number val);
-
-	@Deprecated
-	T setNumTasks(Number val);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java b/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java
deleted file mode 100644
index 793f53e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/FailedException.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package backtype.storm.topology;
-
-public class FailedException extends RuntimeException {
-	public FailedException() {
-		super();
-	}
-
-	public FailedException(String msg) {
-		super(msg);
-	}
-
-	public FailedException(String msg, Throwable cause) {
-		super(msg, cause);
-	}
-
-	public FailedException(Throwable cause) {
-		super(cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java
deleted file mode 100644
index e5f303e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IBasicBolt.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import java.util.Map;
-
-public interface IBasicBolt extends IComponent {
-	void prepare(Map stormConf, TopologyContext context);
-
-	/**
-	 * Process the input tuple and optionally emit new tuples based on the input
-	 * tuple.
-	 * 
-	 * All acking is managed for you. Throw a FailedException if you want to
-	 * fail the tuple.
-	 */
-	void execute(Tuple input, BasicOutputCollector collector);
-
-	void cleanup();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
deleted file mode 100644
index 44c0fe1..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IBasicOutputCollector.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.topology;
-
-import java.util.List;
-
-public interface IBasicOutputCollector {
-	List<Integer> emit(String streamId, List<Object> tuple);
-
-	void emitDirect(int taskId, String streamId, List<Object> tuple);
-
-	void reportError(Throwable t);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java b/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java
deleted file mode 100644
index 88f1450..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IComponent.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package backtype.storm.topology;
-
-import java.io.Serializable;
-import java.util.Map;
-
-/**
- * Common methods for all possible components in a topology. This interface is
- * used when defining topologies using the Java API.
- */
-public interface IComponent extends Serializable {
-
-	/**
-	 * Declare the output schema for all the streams of this topology.
-	 * 
-	 * @param declarer
-	 *            this is used to declare output stream ids, output fields, and
-	 *            whether or not each output stream is a direct stream
-	 */
-	void declareOutputFields(OutputFieldsDeclarer declarer);
-
-	/**
-	 * Declare configuration specific to this component. Only a subset of the
-	 * "topology.*" configs can be overridden. The component configuration can
-	 * be further overridden when constructing the topology using
-	 * {@link TopologyBuilder}
-	 * 
-	 */
-	Map<String, Object> getComponentConfiguration();
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java
deleted file mode 100644
index b79126c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IRichBolt.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.task.IBolt;
-
-/**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout}
- * are the main interfaces to use to implement components of the topology.
- * 
- */
-public interface IRichBolt extends IBolt, IComponent {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java
deleted file mode 100644
index 6953f66..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IRichSpout.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.spout.ISpout;
-
-/**
- * When writing topologies using Java, {@link IRichBolt} and {@link IRichSpout}
- * are the main interfaces to use to implement components of the topology.
- * 
- */
-public interface IRichSpout extends ISpout, IComponent {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java
deleted file mode 100644
index ffc2ec2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/IRichStateSpout.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.state.IStateSpout;
-
-public interface IRichStateSpout extends IStateSpout, IComponent {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java
deleted file mode 100644
index b97daca..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/InputDeclarer.java
+++ /dev/null
@@ -1,48 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.tuple.Fields;
-
-public interface InputDeclarer<T extends InputDeclarer> {
-	public T fieldsGrouping(String componentId, Fields fields);
-
-	public T fieldsGrouping(String componentId, String streamId, Fields fields);
-
-	public T globalGrouping(String componentId);
-
-	public T globalGrouping(String componentId, String streamId);
-
-	public T shuffleGrouping(String componentId);
-
-	public T shuffleGrouping(String componentId, String streamId);
-
-	public T localOrShuffleGrouping(String componentId);
-
-	public T localOrShuffleGrouping(String componentId, String streamId);
-	
-	public T localFirstGrouping(String componentId);
-
-	public T localFirstGrouping(String componentId, String streamId);
-
-	public T noneGrouping(String componentId);
-
-	public T noneGrouping(String componentId, String streamId);
-
-	public T allGrouping(String componentId);
-
-	public T allGrouping(String componentId, String streamId);
-
-	public T directGrouping(String componentId);
-
-	public T directGrouping(String componentId, String streamId);
-
-	public T customGrouping(String componentId, CustomStreamGrouping grouping);
-
-	public T customGrouping(String componentId, String streamId,
-			CustomStreamGrouping grouping);
-
-	public T grouping(GlobalStreamId id, Grouping grouping);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
deleted file mode 100644
index a981cc6..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsDeclarer.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.tuple.Fields;
-
-public interface OutputFieldsDeclarer {
-	/**
-	 * Uses default stream id.
-	 */
-	public void declare(Fields fields);
-
-	public void declare(boolean direct, Fields fields);
-
-	public void declareStream(String streamId, Fields fields);
-
-	public void declareStream(String streamId, boolean direct, Fields fields);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java b/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
deleted file mode 100644
index 1a6c0c2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/OutputFieldsGetter.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.topology;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-
-public class OutputFieldsGetter implements OutputFieldsDeclarer {
-	private Map<String, StreamInfo> _fields = new HashMap<String, StreamInfo>();
-
-	public void declare(Fields fields) {
-		declare(false, fields);
-	}
-
-	public void declare(boolean direct, Fields fields) {
-		declareStream(Utils.DEFAULT_STREAM_ID, direct, fields);
-	}
-
-	public void declareStream(String streamId, Fields fields) {
-		declareStream(streamId, false, fields);
-	}
-
-	public void declareStream(String streamId, boolean direct, Fields fields) {
-		if (_fields.containsKey(streamId)) {
-			throw new IllegalArgumentException("Fields for " + streamId
-					+ " already set");
-		}
-		_fields.put(streamId, new StreamInfo(fields.toList(), direct));
-	}
-
-	public Map<String, StreamInfo> getFieldsDeclaration() {
-		return _fields;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java b/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java
deleted file mode 100644
index 74bd0de..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/ReportedFailedException.java
+++ /dev/null
@@ -1,19 +0,0 @@
-package backtype.storm.topology;
-
-public class ReportedFailedException extends FailedException {
-	public ReportedFailedException() {
-		super();
-	}
-
-	public ReportedFailedException(String msg) {
-		super(msg);
-	}
-
-	public ReportedFailedException(String msg, Throwable cause) {
-		super(msg, cause);
-	}
-
-	public ReportedFailedException(Throwable cause) {
-		super(cause);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java b/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java
deleted file mode 100644
index b14d496..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/SpoutDeclarer.java
+++ /dev/null
@@ -1,6 +0,0 @@
-package backtype.storm.topology;
-
-public interface SpoutDeclarer extends
-		ComponentConfigurationDeclarer<SpoutDeclarer> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java b/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java
deleted file mode 100644
index 2b2f55b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/TopologyBuilder.java
+++ /dev/null
@@ -1,424 +0,0 @@
-package backtype.storm.topology;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Map;
-
-import backtype.storm.Config;
-import backtype.storm.generated.Bolt;
-import backtype.storm.generated.ComponentCommon;
-import backtype.storm.generated.ComponentObject;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.NullStruct;
-import backtype.storm.generated.SpoutSpec;
-import backtype.storm.generated.StateSpoutSpec;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-
-/**
- * TopologyBuilder exposes the Java API for specifying a topology for Storm to
- * execute. Topologies are Thrift structures in the end, but since the Thrift
- * API is so verbose, TopologyBuilder greatly eases the process of creating
- * topologies. The template for creating and submitting a topology looks
- * something like:
- * 
- * <pre>
- * TopologyBuilder builder = new TopologyBuilder();
- * 
- * builder.setSpout(&quot;1&quot;, new TestWordSpout(true), 5);
- * builder.setSpout(&quot;2&quot;, new TestWordSpout(true), 3);
- * builder.setBolt(&quot;3&quot;, new TestWordCounter(), 3)
- * 		.fieldsGrouping(&quot;1&quot;, new Fields(&quot;word&quot;))
- * 		.fieldsGrouping(&quot;2&quot;, new Fields(&quot;word&quot;));
- * builder.setBolt(&quot;4&quot;, new TestGlobalCount()).globalGrouping(&quot;1&quot;);
- * 
- * Map conf = new HashMap();
- * conf.put(Config.TOPOLOGY_WORKERS, 4);
- * 
- * StormSubmitter.submitTopology(&quot;mytopology&quot;, conf, builder.createTopology());
- * </pre>
- * 
- * Running the exact same topology in local mode (in process), and configuring
- * it to log all tuples emitted, looks like the following. Note that it lets the
- * topology run for 10 seconds before shutting down the local cluster.
- * 
- * <pre>
- * TopologyBuilder builder = new TopologyBuilder();
- * 
- * builder.setSpout(&quot;1&quot;, new TestWordSpout(true), 5);
- * builder.setSpout(&quot;2&quot;, new TestWordSpout(true), 3);
- * builder.setBolt(&quot;3&quot;, new TestWordCounter(), 3)
- * 		.fieldsGrouping(&quot;1&quot;, new Fields(&quot;word&quot;))
- * 		.fieldsGrouping(&quot;2&quot;, new Fields(&quot;word&quot;));
- * builder.setBolt(&quot;4&quot;, new TestGlobalCount()).globalGrouping(&quot;1&quot;);
- * 
- * Map conf = new HashMap();
- * conf.put(Config.TOPOLOGY_WORKERS, 4);
- * conf.put(Config.TOPOLOGY_DEBUG, true);
- * 
- * LocalCluster cluster = new LocalCluster();
- * cluster.submitTopology(&quot;mytopology&quot;, conf, builder.createTopology());
- * Utils.sleep(10000);
- * cluster.shutdown();
- * </pre>
- * 
- * <p>
- * The pattern for TopologyBuilder is to map component ids to components using
- * the setSpout and setBolt methods. Those methods return objects that are then
- * used to declare the inputs for that component.
- * </p>
- */
-public class TopologyBuilder {
-	private Map<String, IRichBolt> _bolts = new HashMap<String, IRichBolt>();
-	private Map<String, IRichSpout> _spouts = new HashMap<String, IRichSpout>();
-	private Map<String, ComponentCommon> _commons = new HashMap<String, ComponentCommon>();
-
-	// private Map<String, Map<GlobalStreamId, Grouping>> _inputs = new
-	// HashMap<String, Map<GlobalStreamId, Grouping>>();
-
-	private Map<String, StateSpoutSpec> _stateSpouts = new HashMap<String, StateSpoutSpec>();
-
-	public StormTopology createTopology() {
-		Map<String, Bolt> boltSpecs = new HashMap<String, Bolt>();
-		Map<String, SpoutSpec> spoutSpecs = new HashMap<String, SpoutSpec>();
-		for (String boltId : _bolts.keySet()) {
-			IRichBolt bolt = _bolts.get(boltId);
-			ComponentCommon common = getComponentCommon(boltId, bolt);
-			boltSpecs.put(
-					boltId,
-					new Bolt(ComponentObject.serialized_java(Utils
-							.serialize(bolt)), common));
-		}
-		for (String spoutId : _spouts.keySet()) {
-			IRichSpout spout = _spouts.get(spoutId);
-			ComponentCommon common = getComponentCommon(spoutId, spout);
-			spoutSpecs.put(
-					spoutId,
-					new SpoutSpec(ComponentObject.serialized_java(Utils
-							.serialize(spout)), common));
-
-		}
-		return new StormTopology(spoutSpecs, boltSpecs,
-				new HashMap<String, StateSpoutSpec>());
-	}
-
-	/**
-	 * Define a new bolt in this topology with parallelism of just one thread.
-	 * 
-	 * @param id
-	 *            the id of this component. This id is referenced by other
-	 *            components that want to consume this bolt's outputs.
-	 * @param bolt
-	 *            the bolt
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(String id, IRichBolt bolt) {
-		return setBolt(id, bolt, null);
-	}
-
-	/**
-	 * Define a new bolt in this topology with the specified amount of
-	 * parallelism.
-	 * 
-	 * @param id
-	 *            the id of this component. This id is referenced by other
-	 *            components that want to consume this bolt's outputs.
-	 * @param bolt
-	 *            the bolt
-	 * @param parallelism_hint
-	 *            the number of tasks that should be assigned to execute this
-	 *            bolt. Each task will run on a thread in a process somewhere
-	 *            around the cluster.
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(String id, IRichBolt bolt,
-			Number parallelism_hint) {
-		validateUnusedId(id);
-		initCommon(id, bolt, parallelism_hint);
-		_bolts.put(id, bolt);
-		return new BoltGetter(id);
-	}
-
-	/**
-	 * Define a new bolt in this topology. This defines a basic bolt, which is a
-	 * simpler to use but more restricted kind of bolt. Basic bolts are intended
-	 * for non-aggregation processing and automate the anchoring/acking process
-	 * to achieve proper reliability in the topology.
-	 * 
-	 * @param id
-	 *            the id of this component. This id is referenced by other
-	 *            components that want to consume this bolt's outputs.
-	 * @param bolt
-	 *            the basic bolt
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
-		return setBolt(id, bolt, null);
-	}
-
-	/**
-	 * Define a new bolt in this topology. This defines a basic bolt, which is a
-	 * simpler to use but more restricted kind of bolt. Basic bolts are intended
-	 * for non-aggregation processing and automate the anchoring/acking process
-	 * to achieve proper reliability in the topology.
-	 * 
-	 * @param id
-	 *            the id of this component. This id is referenced by other
-	 *            components that want to consume this bolt's outputs.
-	 * @param bolt
-	 *            the basic bolt
-	 * @param parallelism_hint
-	 *            the number of tasks that should be assigned to execute this
-	 *            bolt. Each task will run on a thread in a process somwehere
-	 *            around the cluster.
-	 * @return use the returned object to declare the inputs to this component
-	 */
-	public BoltDeclarer setBolt(String id, IBasicBolt bolt,
-			Number parallelism_hint) {
-		return setBolt(id, new BasicBoltExecutor(bolt), parallelism_hint);
-	}
-
-	/**
-	 * Define a new spout in this topology.
-	 * 
-	 * @param id
-	 *            the id of this component. This id is referenced by other
-	 *            components that want to consume this spout's outputs.
-	 * @param spout
-	 *            the spout
-	 */
-	public SpoutDeclarer setSpout(String id, IRichSpout spout) {
-		return setSpout(id, spout, null);
-	}
-
-	/**
-	 * Define a new spout in this topology with the specified parallelism. If
-	 * the spout declares itself as non-distributed, the parallelism_hint will
-	 * be ignored and only one task will be allocated to this component.
-	 * 
-	 * @param id
-	 *            the id of this component. This id is referenced by other
-	 *            components that want to consume this spout's outputs.
-	 * @param parallelism_hint
-	 *            the number of tasks that should be assigned to execute this
-	 *            spout. Each task will run on a thread in a process somwehere
-	 *            around the cluster.
-	 * @param spout
-	 *            the spout
-	 */
-	public SpoutDeclarer setSpout(String id, IRichSpout spout,
-			Number parallelism_hint) {
-		validateUnusedId(id);
-		initCommon(id, spout, parallelism_hint);
-		_spouts.put(id, spout);
-		return new SpoutGetter(id);
-	}
-
-	public void setStateSpout(String id, IRichStateSpout stateSpout) {
-		setStateSpout(id, stateSpout, null);
-	}
-
-	public void setStateSpout(String id, IRichStateSpout stateSpout,
-			Number parallelism_hint) {
-		validateUnusedId(id);
-		// TODO: finish
-	}
-
-	private void validateUnusedId(String id) {
-		if (_bolts.containsKey(id)) {
-			throw new IllegalArgumentException(
-					"Bolt has already been declared for id " + id);
-		}
-		if (_spouts.containsKey(id)) {
-			throw new IllegalArgumentException(
-					"Spout has already been declared for id " + id);
-		}
-		if (_stateSpouts.containsKey(id)) {
-			throw new IllegalArgumentException(
-					"State spout has already been declared for id " + id);
-		}
-	}
-
-	private ComponentCommon getComponentCommon(String id, IComponent component) {
-		ComponentCommon ret = new ComponentCommon(_commons.get(id));
-
-		OutputFieldsGetter getter = new OutputFieldsGetter();
-		component.declareOutputFields(getter);
-		ret.set_streams(getter.getFieldsDeclaration());
-		return ret;
-	}
-
-	private void initCommon(String id, IComponent component, Number parallelism) {
-		ComponentCommon common = new ComponentCommon();
-		common.set_inputs(new HashMap<GlobalStreamId, Grouping>());
-		if (parallelism != null)
-			common.set_parallelism_hint(parallelism.intValue());
-		else {
-		    common.set_parallelism_hint(Integer.valueOf(1));
-		}
-		Map conf = component.getComponentConfiguration();
-		if (conf != null)
-			common.set_json_conf(Utils.to_json(conf));
-		_commons.put(id, common);
-	}
-
-	protected class ConfigGetter<T extends ComponentConfigurationDeclarer>
-			extends BaseConfigurationDeclarer<T> {
-		String _id;
-
-		public ConfigGetter(String id) {
-			_id = id;
-		}
-
-		@Override
-		public T addConfigurations(Map conf) {
-			if (conf != null && conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
-				throw new IllegalArgumentException(
-						"Cannot set serializations for a component using fluent API");
-			}
-			String currConf = _commons.get(_id).get_json_conf();
-			_commons.get(_id).set_json_conf(
-					mergeIntoJson(parseJson(currConf), conf));
-			return (T) this;
-		}
-	}
-
-	protected class SpoutGetter extends ConfigGetter<SpoutDeclarer> implements
-			SpoutDeclarer {
-		public SpoutGetter(String id) {
-			super(id);
-		}
-	}
-
-	protected class BoltGetter extends ConfigGetter<BoltDeclarer> implements
-			BoltDeclarer {
-		private String _boltId;
-
-		public BoltGetter(String boltId) {
-			super(boltId);
-			_boltId = boltId;
-		}
-
-		public BoltDeclarer fieldsGrouping(String componentId, Fields fields) {
-			return fieldsGrouping(componentId, Utils.DEFAULT_STREAM_ID, fields);
-		}
-
-		public BoltDeclarer fieldsGrouping(String componentId, String streamId,
-				Fields fields) {
-			return grouping(componentId, streamId,
-					Grouping.fields(fields.toList()));
-		}
-
-		public BoltDeclarer globalGrouping(String componentId) {
-			return globalGrouping(componentId, Utils.DEFAULT_STREAM_ID);
-		}
-
-		public BoltDeclarer globalGrouping(String componentId, String streamId) {
-			return grouping(componentId, streamId,
-					Grouping.fields(new ArrayList<String>()));
-		}
-
-		public BoltDeclarer shuffleGrouping(String componentId) {
-			return shuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
-		}
-
-		public BoltDeclarer shuffleGrouping(String componentId, String streamId) {
-			return grouping(componentId, streamId,
-					Grouping.shuffle(new NullStruct()));
-		}
-
-		public BoltDeclarer localOrShuffleGrouping(String componentId) {
-			return localOrShuffleGrouping(componentId, Utils.DEFAULT_STREAM_ID);
-		}
-
-		public BoltDeclarer localOrShuffleGrouping(String componentId,
-				String streamId) {
-			return grouping(componentId, streamId,
-					Grouping.local_or_shuffle(new NullStruct()));
-		}
-		
-		@Override
-		public BoltDeclarer localFirstGrouping(String componentId) {
-			return localFirstGrouping(componentId, Utils.DEFAULT_STREAM_ID);
-		}
-
-		@Override
-		public BoltDeclarer localFirstGrouping(String componentId,
-				String streamId) {
-			return grouping(componentId, streamId,
-					Grouping.localFirst(new NullStruct()));
-		}
-
-		public BoltDeclarer noneGrouping(String componentId) {
-			return noneGrouping(componentId, Utils.DEFAULT_STREAM_ID);
-		}
-
-		public BoltDeclarer noneGrouping(String componentId, String streamId) {
-			return grouping(componentId, streamId,
-					Grouping.none(new NullStruct()));
-		}
-
-		public BoltDeclarer allGrouping(String componentId) {
-			return allGrouping(componentId, Utils.DEFAULT_STREAM_ID);
-		}
-
-		public BoltDeclarer allGrouping(String componentId, String streamId) {
-			return grouping(componentId, streamId,
-					Grouping.all(new NullStruct()));
-		}
-
-		public BoltDeclarer directGrouping(String componentId) {
-			return directGrouping(componentId, Utils.DEFAULT_STREAM_ID);
-		}
-
-		public BoltDeclarer directGrouping(String componentId, String streamId) {
-			return grouping(componentId, streamId,
-					Grouping.direct(new NullStruct()));
-		}
-
-		private BoltDeclarer grouping(String componentId, String streamId,
-				Grouping grouping) {
-			_commons.get(_boltId).put_to_inputs(
-					new GlobalStreamId(componentId, streamId), grouping);
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer customGrouping(String componentId,
-				CustomStreamGrouping grouping) {
-			return customGrouping(componentId, Utils.DEFAULT_STREAM_ID,
-					grouping);
-		}
-
-		@Override
-		public BoltDeclarer customGrouping(String componentId, String streamId,
-				CustomStreamGrouping grouping) {
-			return grouping(componentId, streamId,
-					Grouping.custom_serialized(Utils.serialize(grouping)));
-		}
-
-		@Override
-		public BoltDeclarer grouping(GlobalStreamId id, Grouping grouping) {
-			return grouping(id.get_componentId(), id.get_streamId(), grouping);
-		}
-
-		
-	}
-
-	private static Map parseJson(String json) {
-		if (json == null)
-			return new HashMap();
-		else
-			return (Map) Utils.from_json(json);
-	}
-
-	private static String mergeIntoJson(Map into, Map newMap) {
-		Map res = new HashMap(into);
-		if (newMap != null)
-			res.putAll(newMap);
-		return Utils.to_json(res);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
deleted file mode 100644
index 2e18207..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBasicBolt.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IBasicBolt;
-import java.util.Map;
-
-public abstract class BaseBasicBolt extends BaseComponent implements IBasicBolt {
-
-	@Override
-	public void prepare(Map stormConf, TopologyContext context) {
-	}
-
-	@Override
-	public void cleanup() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
deleted file mode 100644
index 9171392..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseBatchBolt.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.coordination.IBatchBolt;
-import java.util.Map;
-
-public abstract class BaseBatchBolt<T> extends BaseComponent implements
-		IBatchBolt<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java
deleted file mode 100644
index 1b1449a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseComponent.java
+++ /dev/null
@@ -1,11 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.topology.IComponent;
-import java.util.Map;
-
-public abstract class BaseComponent implements IComponent {
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
deleted file mode 100644
index baf0c65..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseOpaquePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-
-public abstract class BaseOpaquePartitionedTransactionalSpout<T> extends
-		BaseComponent implements IOpaquePartitionedTransactionalSpout<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java
deleted file mode 100644
index 2a9f298..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BasePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import java.util.Map;
-
-public abstract class BasePartitionedTransactionalSpout<T> extends
-		BaseComponent implements IPartitionedTransactionalSpout<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
deleted file mode 100644
index e6a7592..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichBolt.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.topology.IRichBolt;
-
-public abstract class BaseRichBolt extends BaseComponent implements IRichBolt {
-	@Override
-	public void cleanup() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
deleted file mode 100644
index 9c0d733..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseRichSpout.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * To change this template, choose Tools | Templates
- * and open the template in the editor.
- */
-package backtype.storm.topology.base;
-
-import backtype.storm.topology.IRichSpout;
-
-/**
- * 
- * @author nathan
- */
-public abstract class BaseRichSpout extends BaseComponent implements IRichSpout {
-	@Override
-	public void close() {
-	}
-
-	@Override
-	public void activate() {
-	}
-
-	@Override
-	public void deactivate() {
-	}
-
-	@Override
-	public void ack(Object msgId) {
-	}
-
-	@Override
-	public void fail(Object msgId) {
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
deleted file mode 100644
index 77647c0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalBolt.java
+++ /dev/null
@@ -1,8 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.TransactionAttempt;
-
-public abstract class BaseTransactionalBolt extends
-		BaseBatchBolt<TransactionAttempt> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java
deleted file mode 100644
index 01945ea..0000000
--- a/jstorm-client/src/main/java/backtype/storm/topology/base/BaseTransactionalSpout.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.topology.base;
-
-import backtype.storm.transactional.ITransactionalSpout;
-import java.util.Map;
-
-public abstract class BaseTransactionalSpout<T> extends BaseComponent implements
-		ITransactionalSpout<T> {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java b/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java
deleted file mode 100644
index e764fb7..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitter.java
+++ /dev/null
@@ -1,9 +0,0 @@
-package backtype.storm.transactional;
-
-/**
- * This marks an IBatchBolt within a transactional topology as a committer. This
- * causes the finishBatch method to be called in order of the transactions.
- */
-public interface ICommitter {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
deleted file mode 100644
index 58b8e19..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/ICommitterTransactionalSpout.java
+++ /dev/null
@@ -1,13 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.task.TopologyContext;
-import java.util.Map;
-
-public interface ICommitterTransactionalSpout<X> extends ITransactionalSpout<X> {
-	public interface Emitter extends ITransactionalSpout.Emitter {
-		void commit(TransactionAttempt attempt);
-	}
-
-	@Override
-	public Emitter getEmitter(Map conf, TopologyContext context);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
deleted file mode 100644
index 13399e1..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/ITransactionalSpout.java
+++ /dev/null
@@ -1,88 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import java.math.BigInteger;
-import java.util.Map;
-
-public interface ITransactionalSpout<T> extends IComponent {
-	public interface Coordinator<X> {
-		/**
-		 * Create metadata for this particular transaction id which has never
-		 * been emitted before. The metadata should contain whatever is
-		 * necessary to be able to replay the exact batch for the transaction at
-		 * a later point.
-		 * 
-		 * The metadata is stored in Zookeeper.
-		 * 
-		 * Storm uses the Kryo serializations configured in the component
-		 * configuration for this spout to serialize and deserialize the
-		 * metadata.
-		 * 
-		 * @param txid
-		 *            The id of the transaction.
-		 * @param prevMetadata
-		 *            The metadata of the previous transaction
-		 * @return the metadata for this new transaction
-		 */
-		X initializeTransaction(BigInteger txid, X prevMetadata);
-
-		/**
-		 * Returns true if its ok to emit start a new transaction, false
-		 * otherwise (will skip this transaction).
-		 * 
-		 * You should sleep here if you want a delay between asking for the next
-		 * transaction (this will be called repeatedly in a loop).
-		 */
-		boolean isReady();
-
-		/**
-		 * Release any resources from this coordinator.
-		 */
-		void close();
-	}
-
-	public interface Emitter<X> {
-		/**
-		 * Emit a batch for the specified transaction attempt and metadata for
-		 * the transaction. The metadata was created by the Coordinator in the
-		 * initializeTranaction method. This method must always emit the same
-		 * batch of tuples across all tasks for the same transaction id.
-		 * 
-		 * The first field of all emitted tuples must contain the provided
-		 * TransactionAttempt.
-		 * 
-		 */
-		void emitBatch(TransactionAttempt tx, X coordinatorMeta,
-				BatchOutputCollector collector);
-
-		/**
-		 * Any state for transactions prior to the provided transaction id can
-		 * be safely cleaned up, so this method should clean up that state.
-		 */
-		void cleanupBefore(BigInteger txid);
-
-		/**
-		 * Release any resources held by this emitter.
-		 */
-		void close();
-	}
-
-	/**
-	 * The coordinator for a TransactionalSpout runs in a single thread and
-	 * indicates when batches of tuples should be emitted and when transactions
-	 * should commit. The Coordinator that you provide in a TransactionalSpout
-	 * provides metadata for each transaction so that the transactions can be
-	 * replayed.
-	 */
-	Coordinator<T> getCoordinator(Map conf, TopologyContext context);
-
-	/**
-	 * The emitter for a TransactionalSpout runs as many tasks across the
-	 * cluster. Emitters are responsible for emitting batches of tuples for a
-	 * transaction and must ensure that the same batch of tuples is always
-	 * emitted for the same transaction id.
-	 */
-	Emitter<T> getEmitter(Map conf, TopologyContext context);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java
deleted file mode 100644
index 2d02de1..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionAttempt.java
+++ /dev/null
@@ -1,44 +0,0 @@
-package backtype.storm.transactional;
-
-import java.math.BigInteger;
-
-public class TransactionAttempt {
-	BigInteger _txid;
-	long _attemptId;
-
-	// for kryo compatibility
-	public TransactionAttempt() {
-
-	}
-
-	public TransactionAttempt(BigInteger txid, long attemptId) {
-		_txid = txid;
-		_attemptId = attemptId;
-	}
-
-	public BigInteger getTransactionId() {
-		return _txid;
-	}
-
-	public long getAttemptId() {
-		return _attemptId;
-	}
-
-	@Override
-	public int hashCode() {
-		return _txid.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object o) {
-		if (!(o instanceof TransactionAttempt))
-			return false;
-		TransactionAttempt other = (TransactionAttempt) o;
-		return _txid.equals(other._txid) && _attemptId == other._attemptId;
-	}
-
-	@Override
-	public String toString() {
-		return "" + _txid + ":" + _attemptId;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
deleted file mode 100644
index daea107..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutBatchExecutor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.coordination.BatchOutputCollectorImpl;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.TreeMap;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransactionalSpoutBatchExecutor implements IRichBolt {
-	public static Logger LOG = LoggerFactory
-			.getLogger(TransactionalSpoutBatchExecutor.class);
-
-	BatchOutputCollectorImpl _collector;
-	ITransactionalSpout _spout;
-	ITransactionalSpout.Emitter _emitter;
-
-	TreeMap<BigInteger, TransactionAttempt> _activeTransactions = new TreeMap<BigInteger, TransactionAttempt>();
-
-	public TransactionalSpoutBatchExecutor(ITransactionalSpout spout) {
-		_spout = spout;
-	}
-
-	@Override
-	public void prepare(Map conf, TopologyContext context,
-			OutputCollector collector) {
-		_collector = new BatchOutputCollectorImpl(collector);
-		_emitter = _spout.getEmitter(conf, context);
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
-		try {
-			if (input.getSourceStreamId().equals(
-					TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID)) {
-				if (attempt.equals(_activeTransactions.get(attempt
-						.getTransactionId()))) {
-					((ICommitterTransactionalSpout.Emitter) _emitter)
-							.commit(attempt);
-					_activeTransactions.remove(attempt.getTransactionId());
-					_collector.ack(input);
-				} else {
-					_collector.fail(input);
-				}
-			} else {
-				_emitter.emitBatch(attempt, input.getValue(1), _collector);
-				_activeTransactions.put(attempt.getTransactionId(), attempt);
-				_collector.ack(input);
-				BigInteger committed = (BigInteger) input.getValue(2);
-				if (committed != null) {
-					// valid to delete before what's been committed since
-					// those batches will never be accessed again
-					_activeTransactions.headMap(committed).clear();
-					_emitter.cleanupBefore(committed);
-				}
-			}
-		} catch (FailedException e) {
-			LOG.warn("Failed to emit batch for transaction", e);
-			_collector.fail(input);
-		}
-	}
-
-	@Override
-	public void cleanup() {
-		_emitter.close();
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_spout.declareOutputFields(declarer);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return _spout.getComponentConfiguration();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
deleted file mode 100644
index 4810903..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalSpoutCoordinator.java
+++ /dev/null
@@ -1,220 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.Config;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.transactional.state.RotatingTransactionalState;
-import backtype.storm.transactional.state.TransactionalState;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.Utils;
-import java.math.BigInteger;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TransactionalSpoutCoordinator extends BaseRichSpout {
-	public static final Logger LOG = LoggerFactory
-			.getLogger(TransactionalSpoutCoordinator.class);
-
-	public static final BigInteger INIT_TXID = BigInteger.ONE;
-
-	public static final String TRANSACTION_BATCH_STREAM_ID = TransactionalSpoutCoordinator.class
-			.getName() + "/batch";
-	public static final String TRANSACTION_COMMIT_STREAM_ID = TransactionalSpoutCoordinator.class
-			.getName() + "/commit";
-
-	private static final String CURRENT_TX = "currtx";
-	private static final String META_DIR = "meta";
-
-	private ITransactionalSpout _spout;
-	private ITransactionalSpout.Coordinator _coordinator;
-	private TransactionalState _state;
-	private RotatingTransactionalState _coordinatorState;
-
-	TreeMap<BigInteger, TransactionStatus> _activeTx = new TreeMap<BigInteger, TransactionStatus>();
-
-	private SpoutOutputCollector _collector;
-	private Random _rand;
-	BigInteger _currTransaction;
-	int _maxTransactionActive;
-	StateInitializer _initializer;
-
-	public TransactionalSpoutCoordinator(ITransactionalSpout spout) {
-		_spout = spout;
-	}
-
-	public ITransactionalSpout getSpout() {
-		return _spout;
-	}
-
-	@Override
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		_rand = new Random(Utils.secureRandomLong());
-		_state = TransactionalState.newCoordinatorState(conf,
-				(String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID),
-				_spout.getComponentConfiguration());
-		_coordinatorState = new RotatingTransactionalState(_state, META_DIR,
-				true);
-		_collector = collector;
-		_coordinator = _spout.getCoordinator(conf, context);
-		_currTransaction = getStoredCurrTransaction(_state);
-		Object active = conf.get(Config.TOPOLOGY_MAX_SPOUT_PENDING);
-		if (active == null) {
-			_maxTransactionActive = 1;
-		} else {
-			_maxTransactionActive = Utils.getInt(active);
-		}
-		_initializer = new StateInitializer();
-	}
-
-	@Override
-	public void close() {
-		_state.close();
-	}
-
-	@Override
-	public void nextTuple() {
-		sync();
-	}
-
-	@Override
-	public void ack(Object msgId) {
-		TransactionAttempt tx = (TransactionAttempt) msgId;
-		TransactionStatus status = _activeTx.get(tx.getTransactionId());
-		if (status != null && tx.equals(status.attempt)) {
-			if (status.status == AttemptStatus.PROCESSING) {
-				status.status = AttemptStatus.PROCESSED;
-			} else if (status.status == AttemptStatus.COMMITTING) {
-				_activeTx.remove(tx.getTransactionId());
-				_coordinatorState.cleanupBefore(tx.getTransactionId());
-				_currTransaction = nextTransactionId(tx.getTransactionId());
-				_state.setData(CURRENT_TX, _currTransaction);
-			}
-			sync();
-		}
-	}
-
-	@Override
-	public void fail(Object msgId) {
-		TransactionAttempt tx = (TransactionAttempt) msgId;
-		TransactionStatus stored = _activeTx.remove(tx.getTransactionId());
-		if (stored != null && tx.equals(stored.attempt)) {
-			_activeTx.tailMap(tx.getTransactionId()).clear();
-			sync();
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		// in partitioned example, in case an emitter task receives a later
-		// transaction than it's emitted so far,
-		// when it sees the earlier txid it should know to emit nothing
-		declarer.declareStream(TRANSACTION_BATCH_STREAM_ID, new Fields("tx",
-				"tx-meta", "committed-txid"));
-		declarer.declareStream(TRANSACTION_COMMIT_STREAM_ID, new Fields("tx"));
-	}
-
-	private void sync() {
-		// note that sometimes the tuples active may be less than
-		// max_spout_pending, e.g.
-		// max_spout_pending = 3
-		// tx 1, 2, 3 active, tx 2 is acked. there won't be a commit for tx 2
-		// (because tx 1 isn't committed yet),
-		// and there won't be a batch for tx 4 because there's max_spout_pending
-		// tx active
-		TransactionStatus maybeCommit = _activeTx.get(_currTransaction);
-		if (maybeCommit != null
-				&& maybeCommit.status == AttemptStatus.PROCESSED) {
-			maybeCommit.status = AttemptStatus.COMMITTING;
-			_collector.emit(TRANSACTION_COMMIT_STREAM_ID, new Values(
-					maybeCommit.attempt), maybeCommit.attempt);
-		}
-
-		try {
-			if (_activeTx.size() < _maxTransactionActive) {
-				BigInteger curr = _currTransaction;
-				for (int i = 0; i < _maxTransactionActive; i++) {
-					if ((_coordinatorState.hasCache(curr) || _coordinator
-							.isReady()) && !_activeTx.containsKey(curr)) {
-						TransactionAttempt attempt = new TransactionAttempt(
-								curr, _rand.nextLong());
-						Object state = _coordinatorState.getState(curr,
-								_initializer);
-						_activeTx.put(curr, new TransactionStatus(attempt));
-						_collector
-								.emit(TRANSACTION_BATCH_STREAM_ID,
-										new Values(
-												attempt,
-												state,
-												previousTransactionId(_currTransaction)),
-										attempt);
-					}
-					curr = nextTransactionId(curr);
-				}
-			}
-		} catch (FailedException e) {
-			LOG.warn("Failed to get metadata for a transaction", e);
-		}
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		Config ret = new Config();
-		ret.setMaxTaskParallelism(1);
-		return ret;
-	}
-
-	private static enum AttemptStatus {
-		PROCESSING, PROCESSED, COMMITTING
-	}
-
-	private static class TransactionStatus {
-		TransactionAttempt attempt;
-		AttemptStatus status;
-
-		public TransactionStatus(TransactionAttempt attempt) {
-			this.attempt = attempt;
-			this.status = AttemptStatus.PROCESSING;
-		}
-
-		@Override
-		public String toString() {
-			return attempt.toString() + " <" + status.toString() + ">";
-		}
-	}
-
-	private BigInteger nextTransactionId(BigInteger id) {
-		return id.add(BigInteger.ONE);
-	}
-
-	private BigInteger previousTransactionId(BigInteger id) {
-		if (id.equals(INIT_TXID)) {
-			return null;
-		} else {
-			return id.subtract(BigInteger.ONE);
-		}
-	}
-
-	private BigInteger getStoredCurrTransaction(TransactionalState state) {
-		BigInteger ret = (BigInteger) state.getData(CURRENT_TX);
-		if (ret == null)
-			return INIT_TXID;
-		else
-			return ret;
-	}
-
-	private class StateInitializer implements
-			RotatingTransactionalState.StateInitializer {
-		@Override
-		public Object init(BigInteger txid, Object lastState) {
-			return _coordinator.initializeTransaction(txid, lastState);
-		}
-	}
-}