You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:03 UTC

[24/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java b/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
deleted file mode 100644
index b4e437b..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/TransactionalTopologyBuilder.java
+++ /dev/null
@@ -1,566 +0,0 @@
-package backtype.storm.transactional;
-
-import backtype.storm.coordination.IBatchBolt;
-import backtype.storm.coordination.BatchBoltExecutor;
-import backtype.storm.Config;
-import backtype.storm.Constants;
-import backtype.storm.coordination.CoordinatedBolt;
-import backtype.storm.coordination.CoordinatedBolt.IdStreamSpec;
-import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.SpoutDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.transactional.partitioned.IOpaquePartitionedTransactionalSpout;
-import backtype.storm.transactional.partitioned.IPartitionedTransactionalSpout;
-import backtype.storm.transactional.partitioned.OpaquePartitionedTransactionalSpoutExecutor;
-import backtype.storm.transactional.partitioned.PartitionedTransactionalSpoutExecutor;
-import backtype.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Trident subsumes the functionality provided by transactional topologies, so
- * this class is deprecated.
- * 
- */
-@Deprecated
-public class TransactionalTopologyBuilder {
-	String _id;
-	String _spoutId;
-	ITransactionalSpout _spout;
-	Map<String, Component> _bolts = new HashMap<String, Component>();
-	Integer _spoutParallelism;
-	List<Map> _spoutConfs = new ArrayList();
-
-	// id is used to store the state of this transactionalspout in zookeeper
-	// it would be very dangerous to have 2 topologies active with the same id
-	// in the same cluster
-	public TransactionalTopologyBuilder(String id, String spoutId,
-			ITransactionalSpout spout, Number spoutParallelism) {
-		_id = id;
-		_spoutId = spoutId;
-		_spout = spout;
-		_spoutParallelism = (spoutParallelism == null) ? null
-				: spoutParallelism.intValue();
-	}
-
-	public TransactionalTopologyBuilder(String id, String spoutId,
-			ITransactionalSpout spout) {
-		this(id, spoutId, spout, null);
-	}
-
-	public TransactionalTopologyBuilder(String id, String spoutId,
-			IPartitionedTransactionalSpout spout, Number spoutParallelism) {
-		this(id, spoutId, new PartitionedTransactionalSpoutExecutor(spout),
-				spoutParallelism);
-	}
-
-	public TransactionalTopologyBuilder(String id, String spoutId,
-			IPartitionedTransactionalSpout spout) {
-		this(id, spoutId, spout, null);
-	}
-
-	public TransactionalTopologyBuilder(String id, String spoutId,
-			IOpaquePartitionedTransactionalSpout spout, Number spoutParallelism) {
-		this(id, spoutId,
-				new OpaquePartitionedTransactionalSpoutExecutor(spout),
-				spoutParallelism);
-	}
-
-	public TransactionalTopologyBuilder(String id, String spoutId,
-			IOpaquePartitionedTransactionalSpout spout) {
-		this(id, spoutId, spout, null);
-	}
-
-	public SpoutDeclarer getSpoutDeclarer() {
-		return new SpoutDeclarerImpl();
-	}
-
-	public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
-		return setBolt(id, bolt, null);
-	}
-
-	public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
-		return setBolt(id, new BatchBoltExecutor(bolt), parallelism,
-				bolt instanceof ICommitter);
-	}
-
-	public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt) {
-		return setCommitterBolt(id, bolt, null);
-	}
-
-	public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt,
-			Number parallelism) {
-		return setBolt(id, new BatchBoltExecutor(bolt), parallelism, true);
-	}
-
-	public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
-		return setBolt(id, bolt, null);
-	}
-
-	public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
-		return setBolt(id, new BasicBoltExecutor(bolt), parallelism, false);
-	}
-
-	private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism,
-			boolean committer) {
-		Integer p = null;
-		if (parallelism != null)
-			p = parallelism.intValue();
-		Component component = new Component(bolt, p, committer);
-		_bolts.put(id, component);
-		return new BoltDeclarerImpl(component);
-	}
-
-	public TopologyBuilder buildTopologyBuilder() {
-		String coordinator = _spoutId + "/coordinator";
-		TopologyBuilder builder = new TopologyBuilder();
-		SpoutDeclarer declarer = builder.setSpout(coordinator,
-				new TransactionalSpoutCoordinator(_spout));
-		for (Map conf : _spoutConfs) {
-			declarer.addConfigurations(conf);
-		}
-		declarer.addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
-
-		BoltDeclarer emitterDeclarer = builder
-				.setBolt(
-						_spoutId,
-						new CoordinatedBolt(
-								new TransactionalSpoutBatchExecutor(_spout),
-								null, null), _spoutParallelism)
-				.allGrouping(
-						coordinator,
-						TransactionalSpoutCoordinator.TRANSACTION_BATCH_STREAM_ID)
-				.addConfiguration(Config.TOPOLOGY_TRANSACTIONAL_ID, _id);
-		if (_spout instanceof ICommitterTransactionalSpout) {
-			emitterDeclarer.allGrouping(coordinator,
-					TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
-		}
-		for (String id : _bolts.keySet()) {
-			Component component = _bolts.get(id);
-			Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
-			// get all source component
-			for (String c : componentBoltSubscriptions(component)) {
-				coordinatedArgs.put(c, SourceArgs.all());
-			}
-
-			IdStreamSpec idSpec = null;
-			if (component.committer) {
-				idSpec = IdStreamSpec
-						.makeDetectSpec(
-								coordinator,
-								TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
-			}
-			BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(
-					component.bolt, coordinatedArgs, idSpec),
-					component.parallelism);
-			for (Map conf : component.componentConfs) {
-				input.addConfigurations(conf);
-			}
-			for (String c : componentBoltSubscriptions(component)) {
-				input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
-			}
-			for (InputDeclaration d : component.declarations) {
-				d.declare(input);
-			}
-			if (component.committer) {
-				input.allGrouping(
-						coordinator,
-						TransactionalSpoutCoordinator.TRANSACTION_COMMIT_STREAM_ID);
-			}
-		}
-		return builder;
-	}
-
-	public StormTopology buildTopology() {
-		return buildTopologyBuilder().createTopology();
-	}
-
-	private Set<String> componentBoltSubscriptions(Component component) {
-		Set<String> ret = new HashSet<String>();
-		for (InputDeclaration d : component.declarations) {
-			ret.add(d.getComponent());
-		}
-		return ret;
-	}
-
-	private static class Component {
-		public IRichBolt bolt;
-		public Integer parallelism;
-		public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-		public List<Map> componentConfs = new ArrayList<Map>();
-		public boolean committer;
-
-		public Component(IRichBolt bolt, Integer parallelism, boolean committer) {
-			this.bolt = bolt;
-			this.parallelism = parallelism;
-			this.committer = committer;
-		}
-	}
-
-	private static interface InputDeclaration {
-		void declare(InputDeclarer declarer);
-
-		String getComponent();
-	}
-
-	private class SpoutDeclarerImpl extends
-			BaseConfigurationDeclarer<SpoutDeclarer> implements SpoutDeclarer {
-		@Override
-		public SpoutDeclarer addConfigurations(Map conf) {
-			_spoutConfs.add(conf);
-			return this;
-		}
-	}
-
-	private class BoltDeclarerImpl extends
-			BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
-		Component _component;
-
-		public BoltDeclarerImpl(Component component) {
-			_component = component;
-		}
-
-		@Override
-		public BoltDeclarer fieldsGrouping(final String component,
-				final Fields fields) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.fieldsGrouping(component, fields);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer fieldsGrouping(final String component,
-				final String streamId, final Fields fields) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.fieldsGrouping(component, streamId, fields);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer globalGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.globalGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer globalGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.globalGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer shuffleGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.shuffleGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer shuffleGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.shuffleGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer localOrShuffleGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localOrShuffleGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer localOrShuffleGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localOrShuffleGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-		
-		@Override
-		public BoltDeclarer localFirstGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localFirstGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer localFirstGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localFirstGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer noneGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.noneGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer noneGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.noneGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer allGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.allGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer allGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.allGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer directGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.directGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer directGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.directGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer customGrouping(final String component,
-				final CustomStreamGrouping grouping) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.customGrouping(component, grouping);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer customGrouping(final String component,
-				final String streamId, final CustomStreamGrouping grouping) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.customGrouping(component, streamId, grouping);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer grouping(final GlobalStreamId stream,
-				final Grouping grouping) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.grouping(stream, grouping);
-				}
-
-				@Override
-				public String getComponent() {
-					return stream.get_componentId();
-				}
-			});
-			return this;
-		}
-
-		private void addDeclaration(InputDeclaration declaration) {
-			_component.declarations.add(declaration);
-		}
-
-		@Override
-		public BoltDeclarer addConfigurations(Map conf) {
-			_component.componentConfs.add(conf);
-			return this;
-		}
-
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
deleted file mode 100644
index 65c0772..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,46 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.transactional.TransactionAttempt;
-import java.util.Map;
-
-/**
- * This defines a transactional spout which does *not* necessarily replay the
- * same batch every time it emits a batch for a transaction id.
- */
-public interface IOpaquePartitionedTransactionalSpout<T> extends IComponent {
-	public interface Coordinator {
-		/**
-		 * Returns true if its ok to emit start a new transaction, false
-		 * otherwise (will skip this transaction).
-		 * 
-		 * You should sleep here if you want a delay between asking for the next
-		 * transaction (this will be called repeatedly in a loop).
-		 */
-		boolean isReady();
-
-		void close();
-	}
-
-	public interface Emitter<X> {
-		/**
-		 * Emit a batch of tuples for a partition/transaction.
-		 * 
-		 * Return the metadata describing this batch that will be used as
-		 * lastPartitionMeta for defining the parameters of the next batch.
-		 */
-		X emitPartitionBatch(TransactionAttempt tx,
-				BatchOutputCollector collector, int partition,
-				X lastPartitionMeta);
-
-		int numPartitions();
-
-		void close();
-	}
-
-	Emitter<T> getEmitter(Map conf, TopologyContext context);
-
-	Coordinator getCoordinator(Map conf, TopologyContext context);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
deleted file mode 100644
index 31e4c41..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/IPartitionedTransactionalSpout.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import java.util.Map;
-
-/**
- * This interface defines a transactional spout that reads its tuples from a
- * partitioned set of brokers. It automates the storing of metadata for each
- * partition to ensure that the same batch is always emitted for the same
- * transaction id. The partition metadata is stored in Zookeeper.
- */
-public interface IPartitionedTransactionalSpout<T> extends IComponent {
-	public interface Coordinator {
-		/**
-		 * Return the number of partitions currently in the source of data. The
-		 * idea is is that if a new partition is added and a prior transaction
-		 * is replayed, it doesn't emit tuples for the new partition because it
-		 * knows how many partitions were in that transaction.
-		 */
-		int numPartitions();
-
-		/**
-		 * Returns true if its ok to emit start a new transaction, false
-		 * otherwise (will skip this transaction).
-		 * 
-		 * You should sleep here if you want a delay between asking for the next
-		 * transaction (this will be called repeatedly in a loop).
-		 */
-		boolean isReady();
-
-		void close();
-	}
-
-	public interface Emitter<X> {
-		/**
-		 * Emit a batch of tuples for a partition/transaction that's never been
-		 * emitted before. Return the metadata that can be used to reconstruct
-		 * this partition/batch in the future.
-		 */
-		X emitPartitionBatchNew(TransactionAttempt tx,
-				BatchOutputCollector collector, int partition,
-				X lastPartitionMeta);
-
-		/**
-		 * Emit a batch of tuples for a partition/transaction that has been
-		 * emitted before, using the metadata created when it was first emitted.
-		 */
-		void emitPartitionBatch(TransactionAttempt tx,
-				BatchOutputCollector collector, int partition, X partitionMeta);
-
-		void close();
-	}
-
-	Coordinator getCoordinator(Map conf, TopologyContext context);
-
-	Emitter<T> getEmitter(Map conf, TopologyContext context);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
deleted file mode 100644
index 4bc877f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/OpaquePartitionedTransactionalSpoutExecutor.java
+++ /dev/null
@@ -1,153 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.Config;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.ICommitterTransactionalSpout;
-import backtype.storm.transactional.ITransactionalSpout;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.transactional.state.RotatingTransactionalState;
-import backtype.storm.transactional.state.TransactionalState;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-
-public class OpaquePartitionedTransactionalSpoutExecutor implements
-		ICommitterTransactionalSpout<Object> {
-	IOpaquePartitionedTransactionalSpout _spout;
-
-	public class Coordinator implements ITransactionalSpout.Coordinator<Object> {
-		IOpaquePartitionedTransactionalSpout.Coordinator _coordinator;
-
-		public Coordinator(Map conf, TopologyContext context) {
-			_coordinator = _spout.getCoordinator(conf, context);
-		}
-
-		@Override
-		public Object initializeTransaction(BigInteger txid, Object prevMetadata) {
-			return null;
-		}
-
-		@Override
-		public boolean isReady() {
-			return _coordinator.isReady();
-		}
-
-		@Override
-		public void close() {
-			_coordinator.close();
-		}
-	}
-
-	public class Emitter implements ICommitterTransactionalSpout.Emitter {
-		IOpaquePartitionedTransactionalSpout.Emitter _emitter;
-		TransactionalState _state;
-		TreeMap<BigInteger, Map<Integer, Object>> _cachedMetas = new TreeMap<BigInteger, Map<Integer, Object>>();
-		Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
-		int _index;
-		int _numTasks;
-
-		public Emitter(Map conf, TopologyContext context) {
-			_emitter = _spout.getEmitter(conf, context);
-			_index = context.getThisTaskIndex();
-			_numTasks = context.getComponentTasks(context.getThisComponentId())
-					.size();
-			_state = TransactionalState.newUserState(conf,
-					(String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID),
-					getComponentConfiguration());
-			List<String> existingPartitions = _state.list("");
-			for (String p : existingPartitions) {
-				int partition = Integer.parseInt(p);
-				if ((partition - _index) % _numTasks == 0) {
-					_partitionStates.put(partition,
-							new RotatingTransactionalState(_state, p));
-				}
-			}
-		}
-
-		@Override
-		public void emitBatch(TransactionAttempt tx, Object coordinatorMeta,
-				BatchOutputCollector collector) {
-			Map<Integer, Object> metas = new HashMap<Integer, Object>();
-			_cachedMetas.put(tx.getTransactionId(), metas);
-			int partitions = _emitter.numPartitions();
-			Entry<BigInteger, Map<Integer, Object>> entry = _cachedMetas
-					.lowerEntry(tx.getTransactionId());
-			Map<Integer, Object> prevCached;
-			if (entry != null) {
-				prevCached = entry.getValue();
-			} else {
-				prevCached = new HashMap<Integer, Object>();
-			}
-
-			for (int i = _index; i < partitions; i += _numTasks) {
-				RotatingTransactionalState state = _partitionStates.get(i);
-				if (state == null) {
-					state = new RotatingTransactionalState(_state, "" + i);
-					_partitionStates.put(i, state);
-				}
-				state.removeState(tx.getTransactionId());
-				Object lastMeta = prevCached.get(i);
-				if (lastMeta == null)
-					lastMeta = state.getLastState();
-				Object meta = _emitter.emitPartitionBatch(tx, collector, i,
-						lastMeta);
-				metas.put(i, meta);
-			}
-		}
-
-		@Override
-		public void cleanupBefore(BigInteger txid) {
-			for (RotatingTransactionalState state : _partitionStates.values()) {
-				state.cleanupBefore(txid);
-			}
-		}
-
-		@Override
-		public void commit(TransactionAttempt attempt) {
-			BigInteger txid = attempt.getTransactionId();
-			Map<Integer, Object> metas = _cachedMetas.remove(txid);
-			for (Integer partition : metas.keySet()) {
-				Object meta = metas.get(partition);
-				_partitionStates.get(partition).overrideState(txid, meta);
-			}
-		}
-
-		@Override
-		public void close() {
-			_emitter.close();
-		}
-	}
-
-	public OpaquePartitionedTransactionalSpoutExecutor(
-			IOpaquePartitionedTransactionalSpout spout) {
-		_spout = spout;
-	}
-
-	@Override
-	public ITransactionalSpout.Coordinator<Object> getCoordinator(Map conf,
-			TopologyContext context) {
-		return new Coordinator(conf, context);
-	}
-
-	@Override
-	public ICommitterTransactionalSpout.Emitter getEmitter(Map conf,
-			TopologyContext context) {
-		return new Emitter(conf, context);
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_spout.declareOutputFields(declarer);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return _spout.getComponentConfiguration();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java b/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
deleted file mode 100644
index 51bb34e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/partitioned/PartitionedTransactionalSpoutExecutor.java
+++ /dev/null
@@ -1,136 +0,0 @@
-package backtype.storm.transactional.partitioned;
-
-import backtype.storm.Config;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.transactional.ITransactionalSpout;
-import backtype.storm.transactional.TransactionAttempt;
-import backtype.storm.coordination.BatchOutputCollector;
-import backtype.storm.transactional.state.RotatingTransactionalState;
-import backtype.storm.transactional.state.TransactionalState;
-import java.math.BigInteger;
-import java.util.HashMap;
-import java.util.Map;
-
-public class PartitionedTransactionalSpoutExecutor implements
-		ITransactionalSpout<Integer> {
-	IPartitionedTransactionalSpout _spout;
-
-	public PartitionedTransactionalSpoutExecutor(
-			IPartitionedTransactionalSpout spout) {
-		_spout = spout;
-	}
-
-	public IPartitionedTransactionalSpout getPartitionedSpout() {
-		return _spout;
-	}
-
-	class Coordinator implements ITransactionalSpout.Coordinator<Integer> {
-		private IPartitionedTransactionalSpout.Coordinator _coordinator;
-
-		public Coordinator(Map conf, TopologyContext context) {
-			_coordinator = _spout.getCoordinator(conf, context);
-		}
-
-		@Override
-		public Integer initializeTransaction(BigInteger txid,
-				Integer prevMetadata) {
-			return _coordinator.numPartitions();
-		}
-
-		@Override
-		public boolean isReady() {
-			return _coordinator.isReady();
-		}
-
-		@Override
-		public void close() {
-			_coordinator.close();
-		}
-	}
-
-	class Emitter implements ITransactionalSpout.Emitter<Integer> {
-		private IPartitionedTransactionalSpout.Emitter _emitter;
-		private TransactionalState _state;
-		private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
-		private int _index;
-		private int _numTasks;
-
-		public Emitter(Map conf, TopologyContext context) {
-			_emitter = _spout.getEmitter(conf, context);
-			_state = TransactionalState.newUserState(conf,
-					(String) conf.get(Config.TOPOLOGY_TRANSACTIONAL_ID),
-					getComponentConfiguration());
-			_index = context.getThisTaskIndex();
-			_numTasks = context.getComponentTasks(context.getThisComponentId())
-					.size();
-		}
-
-		@Override
-		public void emitBatch(final TransactionAttempt tx,
-				final Integer partitions, final BatchOutputCollector collector) {
-			for (int i = _index; i < partitions; i += _numTasks) {
-				if (!_partitionStates.containsKey(i)) {
-					_partitionStates.put(i, new RotatingTransactionalState(
-							_state, "" + i));
-				}
-				RotatingTransactionalState state = _partitionStates.get(i);
-				final int partition = i;
-				Object meta = state.getStateOrCreate(tx.getTransactionId(),
-						new RotatingTransactionalState.StateInitializer() {
-							@Override
-							public Object init(BigInteger txid, Object lastState) {
-								return _emitter.emitPartitionBatchNew(tx,
-										collector, partition, lastState);
-							}
-						});
-				// it's null if one of:
-				// a) a later transaction batch was emitted before this, so we
-				// should skip this batch
-				// b) if didn't exist and was created (in which case the
-				// StateInitializer was invoked and
-				// it was emitted
-				if (meta != null) {
-					_emitter.emitPartitionBatch(tx, collector, partition, meta);
-				}
-			}
-
-		}
-
-		@Override
-		public void cleanupBefore(BigInteger txid) {
-			for (RotatingTransactionalState state : _partitionStates.values()) {
-				state.cleanupBefore(txid);
-			}
-		}
-
-		@Override
-		public void close() {
-			_state.close();
-			_emitter.close();
-		}
-	}
-
-	@Override
-	public ITransactionalSpout.Coordinator getCoordinator(Map conf,
-			TopologyContext context) {
-		return new Coordinator(conf, context);
-	}
-
-	@Override
-	public ITransactionalSpout.Emitter getEmitter(Map conf,
-			TopologyContext context) {
-		return new Emitter(conf, context);
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_spout.declareOutputFields(declarer);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return _spout.getComponentConfiguration();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java b/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
deleted file mode 100644
index 2ee9f85..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/state/RotatingTransactionalState.java
+++ /dev/null
@@ -1,143 +0,0 @@
-package backtype.storm.transactional.state;
-
-import backtype.storm.transactional.TransactionalSpoutCoordinator;
-
-import java.math.BigInteger;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-
-/**
- * A map from txid to a value. Automatically deletes txids that have been
- * committed.
- */
-public class RotatingTransactionalState {
-	public static interface StateInitializer {
-		Object init(BigInteger txid, Object lastState);
-	}
-
-	private TransactionalState _state;
-	private String _subdir;
-	private boolean _strictOrder;
-
-	private TreeMap<BigInteger, Object> _curr = new TreeMap<BigInteger, Object>();
-
-	public RotatingTransactionalState(TransactionalState state, String subdir,
-			boolean strictOrder) {
-		_state = state;
-		_subdir = subdir;
-		_strictOrder = strictOrder;
-		state.mkdir(subdir);
-		sync();
-	}
-
-	public RotatingTransactionalState(TransactionalState state, String subdir) {
-		this(state, subdir, false);
-	}
-
-	public Object getLastState() {
-		if (_curr.isEmpty())
-			return null;
-		else
-			return _curr.lastEntry().getValue();
-	}
-
-	public void overrideState(BigInteger txid, Object state) {
-		_state.setData(txPath(txid), state);
-		_curr.put(txid, state);
-	}
-
-	public void removeState(BigInteger txid) {
-		if (_curr.containsKey(txid)) {
-			_curr.remove(txid);
-			_state.delete(txPath(txid));
-		}
-	}
-
-	public Object getState(BigInteger txid, StateInitializer init) {
-		if (!_curr.containsKey(txid)) {
-			SortedMap<BigInteger, Object> prevMap = _curr.headMap(txid);
-			SortedMap<BigInteger, Object> afterMap = _curr.tailMap(txid);
-
-			BigInteger prev = null;
-			if (!prevMap.isEmpty())
-				prev = prevMap.lastKey();
-
-			if (_strictOrder) {
-				if (prev == null
-						&& !txid.equals(TransactionalSpoutCoordinator.INIT_TXID)) {
-					throw new IllegalStateException(
-							"Trying to initialize transaction for which there should be a previous state");
-				}
-				if (prev != null && !prev.equals(txid.subtract(BigInteger.ONE))) {
-					throw new IllegalStateException(
-							"Expecting previous txid state to be the previous transaction");
-				}
-				if (!afterMap.isEmpty()) {
-					throw new IllegalStateException(
-							"Expecting tx state to be initialized in strict order but there are txids after that have state");
-				}
-			}
-
-			Object data;
-			if (afterMap.isEmpty()) {
-				Object prevData;
-				if (prev != null) {
-					prevData = _curr.get(prev);
-				} else {
-					prevData = null;
-				}
-				data = init.init(txid, prevData);
-			} else {
-				data = null;
-			}
-			_curr.put(txid, data);
-			_state.setData(txPath(txid), data);
-		}
-		return _curr.get(txid);
-	}
-
-	public boolean hasCache(BigInteger txid) {
-		return _curr.containsKey(txid);
-	}
-
-	/**
-	 * Returns null if it was created, the value otherwise.
-	 */
-	public Object getStateOrCreate(BigInteger txid, StateInitializer init) {
-		if (_curr.containsKey(txid)) {
-			return _curr.get(txid);
-		} else {
-			getState(txid, init);
-			return null;
-		}
-	}
-
-	public void cleanupBefore(BigInteger txid) {
-		Set<BigInteger> toDelete = new HashSet<BigInteger>();
-		toDelete.addAll(_curr.headMap(txid).keySet());
-		for (BigInteger tx : toDelete) {
-			_curr.remove(tx);
-			_state.delete(txPath(tx));
-		}
-	}
-
-	private void sync() {
-		List<String> txids = _state.list(_subdir);
-		for (String txid_s : txids) {
-			Object data = _state.getData(txPath(txid_s));
-			_curr.put(new BigInteger(txid_s), data);
-		}
-	}
-
-	private String txPath(BigInteger tx) {
-		return txPath(tx.toString());
-	}
-
-	private String txPath(String tx) {
-		return _subdir + "/" + tx;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java b/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java
deleted file mode 100644
index 11b8359..0000000
--- a/jstorm-client/src/main/java/backtype/storm/transactional/state/TransactionalState.java
+++ /dev/null
@@ -1,132 +0,0 @@
-package backtype.storm.transactional.state;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.zookeeper.CreateMode;
-import org.apache.zookeeper.KeeperException;
-
-import backtype.storm.Config;
-import backtype.storm.serialization.KryoValuesDeserializer;
-import backtype.storm.serialization.KryoValuesSerializer;
-import backtype.storm.utils.Utils;
-
-public class TransactionalState {
-	CuratorFramework _curator;
-	KryoValuesSerializer _ser;
-	KryoValuesDeserializer _des;
-
-	public static TransactionalState newUserState(Map conf, String id,
-			Map componentConf) {
-		return new TransactionalState(conf, id, componentConf, "user");
-	}
-
-	public static TransactionalState newCoordinatorState(Map conf, String id,
-			Map componentConf) {
-		return new TransactionalState(conf, id, componentConf, "coordinator");
-	}
-
-	protected TransactionalState(Map conf, String id, Map componentConf,
-			String subroot) {
-		try {
-			conf = new HashMap(conf);
-			// ensure that the serialization registrations are consistent with
-			// the declarations in this spout
-			if (componentConf != null) {
-				conf.put(Config.TOPOLOGY_KRYO_REGISTER,
-						componentConf.get(Config.TOPOLOGY_KRYO_REGISTER));
-			}
-			String rootDir = conf.get(Config.TRANSACTIONAL_ZOOKEEPER_ROOT)
-					+ "/" + id + "/" + subroot;
-			List<String> servers = (List<String>) getWithBackup(conf,
-					Config.TRANSACTIONAL_ZOOKEEPER_SERVERS,
-					Config.STORM_ZOOKEEPER_SERVERS);
-			Object port = getWithBackup(conf,
-					Config.TRANSACTIONAL_ZOOKEEPER_PORT,
-					Config.STORM_ZOOKEEPER_PORT);
-			CuratorFramework initter = Utils.newCuratorStarted(conf, servers,
-					port);
-			try {
-				initter.create().creatingParentsIfNeeded().forPath(rootDir);
-			} catch (KeeperException.NodeExistsException e) {
-
-			}
-
-			initter.close();
-
-			_curator = Utils.newCuratorStarted(conf, servers, port, rootDir);
-			_ser = new KryoValuesSerializer(conf);
-			_des = new KryoValuesDeserializer(conf);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void setData(String path, Object obj) {
-		path = "/" + path;
-		byte[] ser = _ser.serializeObject(obj);
-		try {
-			if (_curator.checkExists().forPath(path) != null) {
-				_curator.setData().forPath(path, ser);
-			} else {
-				_curator.create().creatingParentsIfNeeded()
-						.withMode(CreateMode.PERSISTENT).forPath(path, ser);
-			}
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void delete(String path) {
-		path = "/" + path;
-		try {
-			_curator.delete().forPath(path);
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public List<String> list(String path) {
-		path = "/" + path;
-		try {
-			if (_curator.checkExists().forPath(path) == null) {
-				return new ArrayList<String>();
-			} else {
-				return _curator.getChildren().forPath(path);
-			}
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void mkdir(String path) {
-		setData(path, 7);
-	}
-
-	public Object getData(String path) {
-		path = "/" + path;
-		try {
-			if (_curator.checkExists().forPath(path) != null) {
-				return _des.deserializeObject(_curator.getData().forPath(path));
-			} else {
-				return null;
-			}
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public void close() {
-		_curator.close();
-	}
-
-	private Object getWithBackup(Map amap, Object primary, Object backup) {
-		Object ret = amap.get(primary);
-		if (ret == null)
-			return amap.get(backup);
-		return ret;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java b/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java
deleted file mode 100644
index dc9b8bf..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/Fields.java
+++ /dev/null
@@ -1,82 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.io.Serializable;
-
-public class Fields implements Iterable<String>, Serializable {
-	private List<String> _fields;
-	private Map<String, Integer> _index = new HashMap<String, Integer>();
-
-	public Fields(String... fields) {
-		this(Arrays.asList(fields));
-	}
-
-	public Fields(List<String> fields) {
-		_fields = new ArrayList<String>(fields.size());
-		for (String field : fields) {
-			if (_fields.contains(field))
-				throw new IllegalArgumentException(String.format(
-						"duplicate field '%s'", field));
-			_fields.add(field);
-		}
-		index();
-	}
-
-	public List<Object> select(Fields selector, List<Object> tuple) {
-		List<Object> ret = new ArrayList<Object>(selector.size());
-		for (String s : selector) {
-			ret.add(tuple.get(_index.get(s)));
-		}
-		return ret;
-	}
-
-	public List<String> toList() {
-		return new ArrayList<String>(_fields);
-	}
-
-	public int size() {
-		return _fields.size();
-	}
-
-	public String get(int index) {
-		return _fields.get(index);
-	}
-
-	public Iterator<String> iterator() {
-		return _fields.iterator();
-	}
-
-	/**
-	 * Returns the position of the specified field.
-	 */
-	public int fieldIndex(String field) {
-		Integer ret = _index.get(field);
-		if (ret == null) {
-			throw new IllegalArgumentException(field + " does not exist");
-		}
-		return ret;
-	}
-
-	/**
-	 * Returns true if this contains the specified name of the field.
-	 */
-	public boolean contains(String field) {
-		return _index.containsKey(field);
-	}
-
-	private void index() {
-		for (int i = 0; i < _fields.size(); i++) {
-			_index.put(_fields.get(i), i);
-		}
-	}
-
-	@Override
-	public String toString() {
-		return _fields.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java b/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java
deleted file mode 100644
index b00279d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/ITuple.java
+++ /dev/null
@@ -1,119 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.List;
-
-public interface ITuple {
-
-    /**
-     * Returns the number of fields in this tuple.
-     */
-    public int size();
-
-    /**
-     * Returns true if this tuple contains the specified name of the field.
-     */
-    public boolean contains(String field);
-
-    /**
-     * Gets the names of the fields in this tuple.
-     */
-    public Fields getFields();
-
-    /**
-     * Returns the position of the specified field in this tuple.
-     */
-    public int fieldIndex(String field);
-
-    /**
-     * Returns a subset of the tuple based on the fields selector.
-     */
-    public List<Object> select(Fields selector);
-
-    /**
-     * Gets the field at position i in the tuple. Returns object since tuples are dynamically typed.
-     */
-    public Object getValue(int i);
-
-    /**
-     * Returns the String at position i in the tuple. If that field is not a String,
-     * you will get a runtime error.
-     */
-    public String getString(int i);
-
-    /**
-     * Returns the Integer at position i in the tuple. If that field is not an Integer,
-     * you will get a runtime error.
-     */
-    public Integer getInteger(int i);
-
-    /**
-     * Returns the Long at position i in the tuple. If that field is not a Long,
-     * you will get a runtime error.
-     */
-    public Long getLong(int i);
-
-    /**
-     * Returns the Boolean at position i in the tuple. If that field is not a Boolean,
-     * you will get a runtime error.
-     */
-    public Boolean getBoolean(int i);
-
-    /**
-     * Returns the Short at position i in the tuple. If that field is not a Short,
-     * you will get a runtime error.
-     */
-    public Short getShort(int i);
-
-    /**
-     * Returns the Byte at position i in the tuple. If that field is not a Byte,
-     * you will get a runtime error.
-     */
-    public Byte getByte(int i);
-
-    /**
-     * Returns the Double at position i in the tuple. If that field is not a Double,
-     * you will get a runtime error.
-     */
-    public Double getDouble(int i);
-
-    /**
-     * Returns the Float at position i in the tuple. If that field is not a Float,
-     * you will get a runtime error.
-     */
-    public Float getFloat(int i);
-
-    /**
-     * Returns the byte array at position i in the tuple. If that field is not a byte array,
-     * you will get a runtime error.
-     */
-    public byte[] getBinary(int i);
-
-
-    public Object getValueByField(String field);
-
-    public String getStringByField(String field);
-
-    public Integer getIntegerByField(String field);
-
-    public Long getLongByField(String field);
-
-    public Boolean getBooleanByField(String field);
-
-    public Short getShortByField(String field);
-
-    public Byte getByteByField(String field);
-
-    public Double getDoubleByField(String field);
-
-    public Float getFloatByField(String field);
-
-    public byte[] getBinaryByField(String field);
-
-    /**
-     * Gets all the values in this tuple.
-     */
-    public List<Object> getValues();
-
-
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java b/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java
deleted file mode 100644
index b1bd68a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/MessageId.java
+++ /dev/null
@@ -1,86 +0,0 @@
-package backtype.storm.tuple;
-
-import backtype.storm.utils.Utils;
-import com.esotericsoftware.kryo.io.Input;
-import com.esotericsoftware.kryo.io.Output;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Random;
-import java.util.Set;
-
-public class MessageId {
-	private Map<Long, Long> _anchorsToIds;
-
-	@Deprecated
-	public static long generateId() {
-		return Utils.secureRandomLong();
-	}
-
-	public static long generateId(Random rand) {
-		return rand.nextLong();
-	}
-
-	public static MessageId makeUnanchored() {
-		return makeId(new HashMap<Long, Long>());
-	}
-
-	public static MessageId makeId(Map<Long, Long> anchorsToIds) {
-		return new MessageId(anchorsToIds);
-	}
-
-	public static MessageId makeRootId(long id, long val) {
-		Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
-		anchorsToIds.put(id, val);
-		return new MessageId(anchorsToIds);
-	}
-
-	protected MessageId(Map<Long, Long> anchorsToIds) {
-		_anchorsToIds = anchorsToIds;
-	}
-
-	public Map<Long, Long> getAnchorsToIds() {
-		return _anchorsToIds;
-	}
-
-	public Set<Long> getAnchors() {
-		return _anchorsToIds.keySet();
-	}
-
-	@Override
-	public int hashCode() {
-		return _anchorsToIds.hashCode();
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other instanceof MessageId) {
-			return _anchorsToIds.equals(((MessageId) other)._anchorsToIds);
-		} else {
-			return false;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return _anchorsToIds.toString();
-	}
-
-	public void serialize(Output out) throws IOException {
-		out.writeInt(_anchorsToIds.size(), true);
-		for (Entry<Long, Long> anchorToId : _anchorsToIds.entrySet()) {
-			out.writeLong(anchorToId.getKey());
-			out.writeLong(anchorToId.getValue());
-		}
-	}
-
-	public static MessageId deserialize(Input in) throws IOException {
-		int numAnchors = in.readInt(true);
-		Map<Long, Long> anchorsToIds = new HashMap<Long, Long>();
-		for (int i = 0; i < numAnchors; i++) {
-			anchorsToIds.put(in.readLong(), in.readLong());
-		}
-		return new MessageId(anchorsToIds);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java b/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java
deleted file mode 100644
index f170cd2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/Tuple.java
+++ /dev/null
@@ -1,43 +0,0 @@
-package backtype.storm.tuple;
-
-import backtype.storm.generated.GlobalStreamId;
-import java.util.List;
-
-/**
- * The tuple is the main data structure in Storm. A tuple is a named list of values, 
- * where each value can be any type. Tuples are dynamically typed -- the types of the fields 
- * do not need to be declared. Tuples have helper methods like getInteger and getString 
- * to get field values without having to cast the result.
- * 
- * Storm needs to know how to serialize all the values in a tuple. By default, Storm 
- * knows how to serialize the primitive types, strings, and byte arrays. If you want to 
- * use another type, you'll need to implement and register a serializer for that type.
- * See {@link http://github.com/nathanmarz/storm/wiki/Serialization} for more info.
- */
-public interface Tuple extends ITuple{
-
-    /**
-     * Returns the global stream id (component + stream) of this tuple.
-     */
-    public GlobalStreamId getSourceGlobalStreamid();
-    
-    /**
-     * Gets the id of the component that created this tuple.
-     */
-    public String getSourceComponent();
-    
-    /**
-     * Gets the id of the task that created this tuple.
-     */
-    public int getSourceTask();
-    
-    /**
-     * Gets the id of the stream that this tuple was emitted to.
-     */
-    public String getSourceStreamId();
-    
-    /**
-     * Gets the message id that associated with this tuple.
-     */
-    public MessageId getMessageId();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java b/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java
deleted file mode 100644
index 7307342..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/TupleExt.java
+++ /dev/null
@@ -1,12 +0,0 @@
-package backtype.storm.tuple;
-
-public interface TupleExt extends Tuple {
-	/**
-	 * Get Target TaskId
-	 * 
-	 * @return
-	 */
-	int getTargetTaskId();
-
-	void setTargetTaskId(int targetTaskId);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java b/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java
deleted file mode 100644
index 2f47f6e..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImpl.java
+++ /dev/null
@@ -1,342 +0,0 @@
-package backtype.storm.tuple;
-
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.task.GeneralTopologyContext;
-import backtype.storm.utils.IndifferentAccessMap;
-import clojure.lang.ASeq;
-import clojure.lang.Counted;
-import clojure.lang.IMeta;
-import clojure.lang.IPersistentMap;
-import clojure.lang.ISeq;
-import clojure.lang.Indexed;
-import clojure.lang.Keyword;
-import clojure.lang.MapEntry;
-import clojure.lang.Obj;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Seqable;
-import clojure.lang.Symbol;
-import java.util.List;
-
-public class TupleImpl extends IndifferentAccessMap implements Seqable,
-		Indexed, IMeta, Tuple {
-	private List<Object> values;
-	private int taskId;
-	private String streamId;
-	private GeneralTopologyContext context;
-	private MessageId id;
-	private IPersistentMap _meta = null;
-
-	public TupleImpl(GeneralTopologyContext context, List<Object> values,
-			int taskId, String streamId, MessageId id) {
-		this.values = values;
-		this.taskId = taskId;
-		this.streamId = streamId;
-		this.id = id;
-		this.context = context;
-
-		String componentId = context.getComponentId(taskId);
-		Fields schema = context.getComponentOutputFields(componentId, streamId);
-		if (values.size() != schema.size()) {
-			throw new IllegalArgumentException(
-					"Tuple created with wrong number of fields. " + "Expected "
-							+ schema.size() + " fields but got "
-							+ values.size() + " fields");
-		}
-	}
-
-	public TupleImpl(GeneralTopologyContext context, List<Object> values,
-			int taskId, String streamId) {
-		this(context, values, taskId, streamId, MessageId.makeUnanchored());
-	}
-
-	Long _processSampleStartTime = null;
-	Long _executeSampleStartTime = null;
-
-	public void setProcessSampleStartTime(long ms) {
-		_processSampleStartTime = ms;
-	}
-
-	public Long getProcessSampleStartTime() {
-		return _processSampleStartTime;
-	}
-
-	public void setExecuteSampleStartTime(long ms) {
-		_executeSampleStartTime = ms;
-	}
-
-	public Long getExecuteSampleStartTime() {
-		return _executeSampleStartTime;
-	}
-
-	long _outAckVal = 0;
-
-	public void updateAckVal(long val) {
-		_outAckVal = _outAckVal ^ val;
-	}
-
-	public long getAckVal() {
-		return _outAckVal;
-	}
-
-	public int size() {
-		return values.size();
-	}
-
-	public int fieldIndex(String field) {
-		return getFields().fieldIndex(field);
-	}
-
-	public boolean contains(String field) {
-		return getFields().contains(field);
-	}
-
-	public Object getValue(int i) {
-		return values.get(i);
-	}
-
-	public String getString(int i) {
-		return (String) values.get(i);
-	}
-
-	public Integer getInteger(int i) {
-		return (Integer) values.get(i);
-	}
-
-	public Long getLong(int i) {
-		return (Long) values.get(i);
-	}
-
-	public Boolean getBoolean(int i) {
-		return (Boolean) values.get(i);
-	}
-
-	public Short getShort(int i) {
-		return (Short) values.get(i);
-	}
-
-	public Byte getByte(int i) {
-		return (Byte) values.get(i);
-	}
-
-	public Double getDouble(int i) {
-		return (Double) values.get(i);
-	}
-
-	public Float getFloat(int i) {
-		return (Float) values.get(i);
-	}
-
-	public byte[] getBinary(int i) {
-		return (byte[]) values.get(i);
-	}
-
-	public Object getValueByField(String field) {
-		return values.get(fieldIndex(field));
-	}
-
-	public String getStringByField(String field) {
-		return (String) values.get(fieldIndex(field));
-	}
-
-	public Integer getIntegerByField(String field) {
-		return (Integer) values.get(fieldIndex(field));
-	}
-
-	public Long getLongByField(String field) {
-		return (Long) values.get(fieldIndex(field));
-	}
-
-	public Boolean getBooleanByField(String field) {
-		return (Boolean) values.get(fieldIndex(field));
-	}
-
-	public Short getShortByField(String field) {
-		return (Short) values.get(fieldIndex(field));
-	}
-
-	public Byte getByteByField(String field) {
-		return (Byte) values.get(fieldIndex(field));
-	}
-
-	public Double getDoubleByField(String field) {
-		return (Double) values.get(fieldIndex(field));
-	}
-
-	public Float getFloatByField(String field) {
-		return (Float) values.get(fieldIndex(field));
-	}
-
-	public byte[] getBinaryByField(String field) {
-		return (byte[]) values.get(fieldIndex(field));
-	}
-
-	public List<Object> getValues() {
-		return values;
-	}
-
-	public Fields getFields() {
-		return context.getComponentOutputFields(getSourceComponent(),
-				getSourceStreamId());
-	}
-
-	public List<Object> select(Fields selector) {
-		return getFields().select(selector, values);
-	}
-
-	public GlobalStreamId getSourceGlobalStreamid() {
-		return new GlobalStreamId(getSourceComponent(), streamId);
-	}
-
-	public String getSourceComponent() {
-		return context.getComponentId(taskId);
-	}
-
-	public int getSourceTask() {
-		return taskId;
-	}
-
-	public String getSourceStreamId() {
-		return streamId;
-	}
-
-	public MessageId getMessageId() {
-		return id;
-	}
-
-	@Override
-	public String toString() {
-		return "source: " + getSourceComponent() + ":" + taskId + ", stream: "
-				+ streamId + ", id: " + id.toString() + ", "
-				+ values.toString();
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		return this == other;
-	}
-
-	@Override
-	public int hashCode() {
-		return System.identityHashCode(this);
-	}
-
-	private final Keyword makeKeyword(String name) {
-		return Keyword.intern(Symbol.create(name));
-	}
-
-	/* ILookup */
-	@Override
-	public Object valAt(Object o) {
-		try {
-			if (o instanceof Keyword) {
-				return getValueByField(((Keyword) o).getName());
-			} else if (o instanceof String) {
-				return getValueByField((String) o);
-			}
-		} catch (IllegalArgumentException e) {
-		}
-		return null;
-	}
-
-	/* Seqable */
-	public ISeq seq() {
-		if (values.size() > 0) {
-			return new Seq(getFields().toList(), values, 0);
-		}
-		return null;
-	}
-
-	static class Seq extends ASeq implements Counted {
-		final List<String> fields;
-		final List<Object> values;
-		final int i;
-
-		Seq(List<String> fields, List<Object> values, int i) {
-			this.fields = fields;
-			this.values = values;
-			assert i >= 0;
-			this.i = i;
-		}
-
-		public Seq(IPersistentMap meta, List<String> fields,
-				List<Object> values, int i) {
-			super(meta);
-			this.fields = fields;
-			this.values = values;
-			assert i >= 0;
-			this.i = i;
-		}
-
-		public Object first() {
-			return new MapEntry(fields.get(i), values.get(i));
-		}
-
-		public ISeq next() {
-			if (i + 1 < fields.size()) {
-				return new Seq(fields, values, i + 1);
-			}
-			return null;
-		}
-
-		public int count() {
-			assert fields.size() - i >= 0 : "index out of bounds";
-			// i being the position in the fields of this seq, the remainder of
-			// the seq is the size
-			return fields.size() - i;
-		}
-
-		public Obj withMeta(IPersistentMap meta) {
-			return new Seq(meta, fields, values, i);
-		}
-	}
-
-	/* Indexed */
-	public Object nth(int i) {
-		if (i < values.size()) {
-			return values.get(i);
-		} else {
-			return null;
-		}
-	}
-
-	public Object nth(int i, Object notfound) {
-		Object ret = nth(i);
-		if (ret == null)
-			ret = notfound;
-		return ret;
-	}
-
-	/* Counted */
-	public int count() {
-		return values.size();
-	}
-
-	/* IMeta */
-	public IPersistentMap meta() {
-		if (_meta == null) {
-			_meta = new PersistentArrayMap(new Object[] {
-					makeKeyword("stream"), getSourceStreamId(),
-					makeKeyword("component"), getSourceComponent(),
-					makeKeyword("task"), getSourceTask() });
-		}
-		return _meta;
-	}
-
-	private PersistentArrayMap toMap() {
-		Object array[] = new Object[values.size() * 2];
-		List<String> fields = getFields().toList();
-		for (int i = 0; i < values.size(); i++) {
-			array[i * 2] = fields.get(i);
-			array[(i * 2) + 1] = values.get(i);
-		}
-		return new PersistentArrayMap(array);
-	}
-
-	public IPersistentMap getMap() {
-		if (_map == null) {
-			setMap(toMap());
-		}
-		return _map;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java b/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java
deleted file mode 100644
index 5d4b487..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/TupleImplExt.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.List;
-
-import backtype.storm.task.GeneralTopologyContext;
-
-public class TupleImplExt extends TupleImpl implements TupleExt {
-
-	protected int targetTaskId;
-
-	public TupleImplExt(GeneralTopologyContext context, List<Object> values,
-			int taskId, String streamId) {
-		super(context, values, taskId, streamId);
-	}
-
-	public TupleImplExt(GeneralTopologyContext context, List<Object> values,
-			int taskId, String streamId, MessageId id) {
-		super(context, values, taskId, streamId, id);
-	}
-
-	@Override
-	public int getTargetTaskId() {
-		return targetTaskId;
-	}
-
-	@Override
-	public void setTargetTaskId(int targetTaskId) {
-		this.targetTaskId = targetTaskId;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/tuple/Values.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/tuple/Values.java b/jstorm-client/src/main/java/backtype/storm/tuple/Values.java
deleted file mode 100644
index d374f67..0000000
--- a/jstorm-client/src/main/java/backtype/storm/tuple/Values.java
+++ /dev/null
@@ -1,20 +0,0 @@
-package backtype.storm.tuple;
-
-import java.util.ArrayList;
-
-/**
- * A convenience class for making tuple values using new Values("field1", 2, 3)
- * syntax.
- */
-public class Values extends ArrayList<Object> {
-	public Values() {
-
-	}
-
-	public Values(Object... vals) {
-		super(vals.length);
-		for (Object o : vals) {
-			add(o);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java b/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java
deleted file mode 100644
index c3e1a20..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/BufferFileInputStream.java
+++ /dev/null
@@ -1,37 +0,0 @@
-package backtype.storm.utils;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-
-public class BufferFileInputStream {
-	byte[] buffer;
-	FileInputStream stream;
-
-	public BufferFileInputStream(String file, int bufferSize)
-			throws FileNotFoundException {
-		stream = new FileInputStream(file);
-		buffer = new byte[bufferSize];
-	}
-
-	public BufferFileInputStream(String file) throws FileNotFoundException {
-		this(file, 15 * 1024);
-	}
-
-	public byte[] read() throws IOException {
-		int length = stream.read(buffer);
-		if (length == -1) {
-			close();
-			return new byte[0];
-		} else if (length == buffer.length) {
-			return buffer;
-		} else {
-			return Arrays.copyOf(buffer, length);
-		}
-	}
-
-	public void close() throws IOException {
-		stream.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java b/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java
deleted file mode 100644
index 46265b0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/CRC32OutputStream.java
+++ /dev/null
@@ -1,27 +0,0 @@
-package backtype.storm.utils;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.zip.CRC32;
-
-public class CRC32OutputStream extends OutputStream {
-	private CRC32 hasher;
-
-	public CRC32OutputStream() {
-		hasher = new CRC32();
-	}
-
-	public long getValue() {
-		return hasher.getValue();
-	}
-
-	@Override
-	public void write(int i) throws IOException {
-		hasher.update(i);
-	}
-
-	@Override
-	public void write(byte[] bytes, int start, int end) throws IOException {
-		hasher.update(bytes, start, end);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java b/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java
deleted file mode 100644
index b9094e2..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/ClojureTimerTask.java
+++ /dev/null
@@ -1,18 +0,0 @@
-package backtype.storm.utils;
-
-import clojure.lang.IFn;
-import java.util.TimerTask;
-
-public class ClojureTimerTask extends TimerTask {
-	IFn _afn;
-
-	public ClojureTimerTask(IFn afn) {
-		super();
-		_afn = afn;
-	}
-
-	@Override
-	public void run() {
-		_afn.run();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/Container.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/Container.java b/jstorm-client/src/main/java/backtype/storm/utils/Container.java
deleted file mode 100644
index b8a6f12..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/Container.java
+++ /dev/null
@@ -1,7 +0,0 @@
-package backtype.storm.utils;
-
-import java.io.Serializable;
-
-public class Container implements Serializable {
-	public Object object;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java b/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java
deleted file mode 100644
index 975d7d8..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DRPCClient.java
+++ /dev/null
@@ -1,70 +0,0 @@
-package backtype.storm.utils;
-
-import org.apache.thrift7.TException;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.transport.TFramedTransport;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
-
-import backtype.storm.generated.DRPCExecutionException;
-import backtype.storm.generated.DistributedRPC;
-
-public class DRPCClient implements DistributedRPC.Iface {
-	private TTransport conn;
-	private DistributedRPC.Client client;
-	private String host;
-	private int port;
-	private Integer timeout;
-
-	public DRPCClient(String host, int port, Integer timeout) {
-		try {
-			this.host = host;
-			this.port = port;
-			this.timeout = timeout;
-			connect();
-		} catch (TException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	public DRPCClient(String host, int port) {
-		this(host, port, null);
-	}
-
-	private void connect() throws TException {
-		TSocket socket = new TSocket(host, port);
-		if (timeout != null) {
-			socket.setTimeout(timeout);
-		}
-		conn = new TFramedTransport(socket);
-		client = new DistributedRPC.Client(new TBinaryProtocol(conn));
-		conn.open();
-	}
-
-	public String getHost() {
-		return host;
-	}
-
-	public int getPort() {
-		return port;
-	}
-
-	public String execute(String func, String args) throws TException,
-			DRPCExecutionException {
-		try {
-			if (client == null)
-				connect();
-			return client.execute(func, args);
-		} catch (TException e) {
-			client = null;
-			throw e;
-		} catch (DRPCExecutionException e) {
-			client = null;
-			throw e;
-		}
-	}
-
-	public void close() {
-		conn.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java b/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java
deleted file mode 100644
index 8d9e861..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueue.java
+++ /dev/null
@@ -1,71 +0,0 @@
-package backtype.storm.utils;
-
-import backtype.storm.metric.api.IStatefulObject;
-
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-/**
- * 
- * A single consumer queue that uses the LMAX Disruptor. They key to the
- * performance is the ability to catch up to the producer by processing tuples
- * in batches.
- */
-public abstract class DisruptorQueue implements IStatefulObject {
-	public static void setUseSleep(boolean useSleep) {
-		DisruptorQueueImpl.setUseSleep(useSleep);
-	}
-
-	private static boolean CAPACITY_LIMITED = false;
-
-	public static void setLimited(boolean limited) {
-		CAPACITY_LIMITED = limited;
-	}
-
-	public static DisruptorQueue mkInstance(String queueName,
-			ProducerType producerType, int bufferSize, WaitStrategy wait) {
-		if (CAPACITY_LIMITED == true) {
-			return new DisruptorQueueImpl(queueName, producerType, bufferSize,
-					wait);
-		} else {
-			return new DisruptorWrapBlockingQueue(queueName, producerType,
-					bufferSize, wait);
-		}
-	}
-
-	public abstract String getName();
-
-	
-
-	public abstract void haltWithInterrupt();
-
-	public abstract Object poll();
-
-	public abstract Object take();
-
-	public abstract void consumeBatch(EventHandler<Object> handler);
-	
-	public abstract void consumeBatchWhenAvailable(EventHandler<Object> handler);
-
-	public abstract void publish(Object obj);
-
-	public abstract void publish(Object obj, boolean block)
-			throws InsufficientCapacityException;
-
-	public abstract void consumerStarted();
-
-	public abstract void clear();
-
-	public abstract long population();
-
-	public abstract long capacity();
-
-	public abstract long writePos();
-
-	public abstract long readPos();
-
-	public abstract float pctFull();
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java b/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
deleted file mode 100644
index 0c334b5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorQueueImpl.java
+++ /dev/null
@@ -1,298 +0,0 @@
-package backtype.storm.utils;
-
-import java.util.HashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.metric.api.IStatefulObject;
-import backtype.storm.utils.disruptor.AbstractSequencerExt;
-import backtype.storm.utils.disruptor.RingBuffer;
-
-import com.lmax.disruptor.AlertException;
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.Sequence;
-import com.lmax.disruptor.SequenceBarrier;
-import com.lmax.disruptor.TimeoutException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-/**
- * 
- * A single consumer queue that uses the LMAX Disruptor. They key to the
- * performance is the ability to catch up to the producer by processing tuples
- * in batches.
- */
-public class DisruptorQueueImpl extends DisruptorQueue {
-	private static final Logger LOG = Logger.getLogger(DisruptorQueueImpl.class);
-	static boolean useSleep = true;
-	public static void setUseSleep(boolean useSleep) {
-		AbstractSequencerExt.setWaitSleep(useSleep);
-	}
-	
-	private static final Object FLUSH_CACHE = new Object();
-	private static final Object INTERRUPT = new Object();
-	private static final String PREFIX = "disruptor-";
-
-	private final String _queueName;
-	private final RingBuffer<MutableObject> _buffer;
-	private final Sequence _consumer;
-	private final SequenceBarrier _barrier;
-
-	// TODO: consider having a threadlocal cache of this variable to speed up
-	// reads?
-	volatile boolean consumerStartedFlag = false;
-
-	private final HashMap<String, Object> state = new HashMap<String, Object>(4);
-	private final ConcurrentLinkedQueue<Object> _cache = new ConcurrentLinkedQueue<Object>();
-	private final ReentrantReadWriteLock cacheLock = new ReentrantReadWriteLock();
-	private final Lock readLock = cacheLock.readLock();
-	private final Lock writeLock = cacheLock.writeLock();
-
-	public DisruptorQueueImpl(String queueName, ProducerType producerType,
-			int bufferSize, WaitStrategy wait) {
-		this._queueName = PREFIX + queueName;
-		_buffer = RingBuffer.create(producerType, new ObjectEventFactory(),
-				bufferSize, wait);
-		_consumer = new Sequence();
-		_barrier = _buffer.newBarrier();
-		_buffer.addGatingSequences(_consumer);
-		if (producerType == ProducerType.SINGLE) {
-			consumerStartedFlag = true;
-		} else {
-			// make sure we flush the pending messages in cache first
-			if (bufferSize < 2) {
-				throw new RuntimeException("QueueSize must >= 2");
-			}
-			try {
-				publishDirect(FLUSH_CACHE, true);
-			} catch (InsufficientCapacityException e) {
-				throw new RuntimeException("This code should be unreachable!",
-						e);
-			}
-		}
-	}
-
-	public String getName() {
-		return _queueName;
-	}
-
-	public void consumeBatch(EventHandler<Object> handler) {
-		consumeBatchToCursor(_barrier.getCursor(), handler);
-	}
-
-	public void haltWithInterrupt() {
-		publish(INTERRUPT);
-	}
-
-	public Object poll() {
-		// @@@
-		// should use _cache.isEmpty, but it is slow
-		// I will change the logic later
-		if (consumerStartedFlag == false) {
-			return _cache.poll();
-		}
-
-		final long nextSequence = _consumer.get() + 1;
-		if (nextSequence <= _barrier.getCursor()) {
-			MutableObject mo = _buffer.get(nextSequence);
-			_consumer.set(nextSequence);
-			Object ret = mo.o;
-			mo.setObject(null);
-			return ret;
-		}
-		return null;
-	}
-
-	public Object take() {
-		// @@@
-		// should use _cache.isEmpty, but it is slow
-		// I will change the logic later
-		if (consumerStartedFlag == false) {
-			return _cache.poll();
-		}
-
-		final long nextSequence = _consumer.get() + 1;
-		// final long availableSequence;
-		try {
-			_barrier.waitFor(nextSequence);
-		} catch (AlertException e) {
-			LOG.error(e.getCause(), e);
-			throw new RuntimeException(e);
-		} catch (InterruptedException e) {
-			LOG.error("InterruptedException " + e.getCause());
-			// throw new RuntimeException(e);
-			return null;
-		} catch (TimeoutException e) {
-			LOG.error(e.getCause(), e);
-			return null;
-		}
-		MutableObject mo = _buffer.get(nextSequence);
-		_consumer.set(nextSequence);
-		Object ret = mo.o;
-		mo.setObject(null);
-		return ret;
-	}
-
-	public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
-		try {
-			final long nextSequence = _consumer.get() + 1;
-			final long availableSequence = _barrier.waitFor(nextSequence);
-			if (availableSequence >= nextSequence) {
-				consumeBatchToCursor(availableSequence, handler);
-			}
-		} catch (AlertException e) {
-			LOG.error(e.getCause(), e);
-			throw new RuntimeException(e);
-		} catch (InterruptedException e) {
-			LOG.error("InterruptedException " + e.getCause());
-			return;
-		}catch (TimeoutException e) {
-			LOG.error(e.getCause(), e);
-			return ;
-		}
-	}
-
-	public void consumeBatchToCursor(long cursor, EventHandler<Object> handler){
-		for (long curr = _consumer.get() + 1; curr <= cursor; curr++) {
-			try {
-				MutableObject mo = _buffer.get(curr);
-				Object o = mo.o;
-				mo.setObject(null);
-				if (o == FLUSH_CACHE) {
-					Object c = null;
-					while (true) {
-						c = _cache.poll();
-						if (c == null)
-							break;
-						else
-							handler.onEvent(c, curr, true);
-					}
-				} else if (o == INTERRUPT) {
-					throw new InterruptedException(
-							"Disruptor processing interrupted");
-				} else {
-					handler.onEvent(o, curr, curr == cursor);
-				}
-			} catch (InterruptedException e) {
-				// throw new RuntimeException(e);
-				LOG.error(e.getCause());
-				return;
-			} catch (Exception e) {
-				LOG.error(e.getCause(), e);
-				throw new RuntimeException(e);
-			}
-		}
-		// TODO: only set this if the consumer cursor has changed?
-		_consumer.set(cursor);
-	}
-
-	/*
-	 * Caches until consumerStarted is called, upon which the cache is flushed
-	 * to the consumer
-	 */
-	public void publish(Object obj) {
-		try {
-			publish(obj, true);
-		} catch (InsufficientCapacityException ex) {
-			throw new RuntimeException("This code should be unreachable!");
-		}
-	}
-
-	public void tryPublish(Object obj) throws InsufficientCapacityException {
-		publish(obj, false);
-	}
-
-	public void publish(Object obj, boolean block)
-			throws InsufficientCapacityException {
-
-		boolean publishNow = consumerStartedFlag;
-
-		if (!publishNow) {
-			readLock.lock();
-			try {
-				publishNow = consumerStartedFlag;
-				if (!publishNow) {
-					_cache.add(obj);
-				}
-			} finally {
-				readLock.unlock();
-			}
-		}
-
-		if (publishNow) {
-			publishDirect(obj, block);
-		}
-	}
-
-	protected void publishDirect(Object obj, boolean block)
-			throws InsufficientCapacityException {
-		final long id;
-		if (block) {
-			id = _buffer.next();
-		} else {
-			id = _buffer.tryNext(1);
-		}
-		final MutableObject m = _buffer.get(id);
-		m.setObject(obj);
-		_buffer.publish(id);
-	}
-
-	public void consumerStarted() {
-
-		writeLock.lock();
-		consumerStartedFlag = true;
-		
-		writeLock.unlock();
-	}
-
-	public void clear() {
-		while (population() != 0L) {
-			poll();
-		}
-	}
-
-	public long population() {
-		return (writePos() - readPos());
-	}
-
-	public long capacity() {
-		return _buffer.getBufferSize();
-	}
-
-	public long writePos() {
-		return _buffer.getCursor();
-	}
-
-	public long readPos() {
-		return _consumer.get();
-	}
-
-	public float pctFull() {
-		return (1.0F * population() / capacity());
-	}
-
-	@Override
-	public Object getState() {
-		// get readPos then writePos so it's never an under-estimate
-		long rp = readPos();
-		long wp = writePos();
-		state.put("capacity", capacity());
-		state.put("population", wp - rp);
-		state.put("write_pos", wp);
-		state.put("read_pos", rp);
-		return state;
-	}
-
-	public static class ObjectEventFactory implements
-			EventFactory<MutableObject> {
-		@Override
-		public MutableObject newInstance() {
-			return new MutableObject();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java b/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
deleted file mode 100644
index a701f39..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/DisruptorWrapBlockingQueue.java
+++ /dev/null
@@ -1,192 +0,0 @@
-package backtype.storm.utils;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.log4j.Logger;
-
-import backtype.storm.metric.api.IStatefulObject;
-
-import com.lmax.disruptor.EventFactory;
-import com.lmax.disruptor.EventHandler;
-import com.lmax.disruptor.InsufficientCapacityException;
-import com.lmax.disruptor.WaitStrategy;
-import com.lmax.disruptor.dsl.ProducerType;
-
-/**
- * 
- * A single consumer queue that uses the LMAX Disruptor. They key to the
- * performance is the ability to catch up to the producer by processing tuples
- * in batches.
- */
-public class DisruptorWrapBlockingQueue extends DisruptorQueue {
-	private static final Logger LOG = Logger
-			.getLogger(DisruptorWrapBlockingQueue.class);
-
-	private static final long QUEUE_CAPACITY = 512;
-	private LinkedBlockingDeque<Object> queue;
-
-	private String queueName;
-
-	public DisruptorWrapBlockingQueue(String queueName,
-			ProducerType producerType, int bufferSize, WaitStrategy wait) {
-		this.queueName = queueName;
-		queue = new LinkedBlockingDeque<Object>();
-	}
-
-	public String getName() {
-		return queueName;
-	}
-
-	// poll method
-	public void consumeBatch(EventHandler<Object> handler) {
-		consumeBatchToCursor(0, handler);
-	}
-
-	public void haltWithInterrupt() {
-	}
-
-	public Object poll() {
-		return queue.poll();
-	}
-
-	public Object take() {
-		try {
-			return queue.take();
-		} catch (InterruptedException e) {
-			return null;
-		}
-	}
-	
-	public void drainQueue(Object object, EventHandler<Object> handler) {
-		while (object != null) {
-			try {
-				handler.onEvent(object, 0, false);
-				object = queue.poll();
-			} catch (InterruptedException e) {
-				LOG.warn("Occur interrupt error, " + object);
-				break;
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		}
-	}
-
-	public void consumeBatchWhenAvailable(EventHandler<Object> handler) {
-		Object object = queue.poll();
-		if (object == null) {
-			try {
-				object = queue.take();
-			} catch (InterruptedException e) {
-				LOG.warn("Occur interrupt error, " + object);
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			}
-		}
-
-		drainQueue(object, handler);
-
-	}
-
-	public void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
-		Object object = queue.poll();
-		drainQueue(object, handler);
-	}
-
-	/*
-	 * Caches until consumerStarted is called, upon which the cache is flushed
-	 * to the consumer
-	 */
-	public void publish(Object obj) {
-		boolean isSuccess = queue.offer(obj);
-		while (isSuccess == false) {
-			try {
-				Thread.sleep(1);
-			} catch (InterruptedException e) {
-			}
-			isSuccess = queue.offer(obj);
-		}
-
-	}
-
-	public void tryPublish(Object obj) throws InsufficientCapacityException {
-		boolean isSuccess = queue.offer(obj);
-		if (isSuccess == false) {
-			throw InsufficientCapacityException.INSTANCE;
-		}
-
-	}
-
-	public void publish(Object obj, boolean block)
-			throws InsufficientCapacityException {
-		if (block == true) {
-			publish(obj);
-		} else {
-			tryPublish(obj);
-		}
-	}
-
-	public void consumerStarted() {
-	}
-
-	private void flushCache() {
-	}
-
-	public void clear() {
-		queue.clear();
-	}
-
-	public long population() {
-		return queue.size();
-	}
-
-	public long capacity() {
-		long used = queue.size();
-		if (used < QUEUE_CAPACITY) {
-			return QUEUE_CAPACITY;
-		} else {
-			return used;
-		}
-	}
-
-	public long writePos() {
-		return 0;
-	}
-
-	public long readPos() {
-		return queue.size();
-	}
-
-	public float pctFull() {
-		long used = queue.size();
-		if (used < QUEUE_CAPACITY) {
-			return (1.0F * used / QUEUE_CAPACITY);
-		} else {
-			return 1.0f;
-		}
-	}
-
-	@Override
-	public Object getState() {
-		Map state = new HashMap<String, Object>();
-		// get readPos then writePos so it's never an under-estimate
-		long rp = readPos();
-		long wp = writePos();
-		state.put("capacity", capacity());
-		state.put("population", wp - rp);
-		state.put("write_pos", wp);
-		state.put("read_pos", rp);
-		return state;
-	}
-
-	public static class ObjectEventFactory implements
-			EventFactory<MutableObject> {
-		@Override
-		public MutableObject newInstance() {
-			return new MutableObject();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java b/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
deleted file mode 100644
index 74c4a63..0000000
--- a/jstorm-client/src/main/java/backtype/storm/utils/IndifferentAccessMap.java
+++ /dev/null
@@ -1,169 +0,0 @@
-package backtype.storm.utils;
-
-import clojure.lang.ILookup;
-import clojure.lang.ISeq;
-import clojure.lang.AFn;
-import clojure.lang.IPersistentMap;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.IMapEntry;
-import clojure.lang.IPersistentCollection;
-import clojure.lang.Keyword;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Collection;
-import java.util.Set;
-
-public class IndifferentAccessMap extends AFn implements ILookup,
-		IPersistentMap, Map {
-
-	protected IPersistentMap _map;
-
-	protected IndifferentAccessMap() {
-	}
-
-	public IndifferentAccessMap(IPersistentMap map) {
-		setMap(map);
-	}
-
-	public IPersistentMap getMap() {
-		return _map;
-	}
-
-	public IPersistentMap setMap(IPersistentMap map) {
-		_map = map;
-		return _map;
-	}
-
-	public int size() {
-		return ((Map) getMap()).size();
-	}
-
-	public int count() {
-		return size();
-	}
-
-	public ISeq seq() {
-		return getMap().seq();
-	}
-
-	@Override
-	public Object valAt(Object o) {
-		if (o instanceof Keyword) {
-			return valAt(((Keyword) o).getName());
-		}
-		return getMap().valAt(o);
-	}
-
-	@Override
-	public Object valAt(Object o, Object def) {
-		Object ret = valAt(o);
-		if (ret == null)
-			ret = def;
-		return ret;
-	}
-
-	/* IFn */
-	@Override
-	public Object invoke(Object o) {
-		return valAt(o);
-	}
-
-	@Override
-	public Object invoke(Object o, Object notfound) {
-		return valAt(o, notfound);
-	}
-
-	/* IPersistentMap */
-	/* Naive implementation, but it might be good enough */
-	public IPersistentMap assoc(Object k, Object v) {
-		if (k instanceof Keyword)
-			return assoc(((Keyword) k).getName(), v);
-
-		return new IndifferentAccessMap(getMap().assoc(k, v));
-	}
-
-	public IPersistentMap assocEx(Object k, Object v) {
-		if (k instanceof Keyword)
-			return assocEx(((Keyword) k).getName(), v);
-
-		return new IndifferentAccessMap(getMap().assocEx(k, v));
-	}
-
-	public IPersistentMap without(Object k) {
-		if (k instanceof Keyword)
-			return without(((Keyword) k).getName());
-
-		return new IndifferentAccessMap(getMap().without(k));
-	}
-
-	public boolean containsKey(Object k) {
-		if (k instanceof Keyword)
-			return containsKey(((Keyword) k).getName());
-		return getMap().containsKey(k);
-	}
-
-	public IMapEntry entryAt(Object k) {
-		if (k instanceof Keyword)
-			return entryAt(((Keyword) k).getName());
-
-		return getMap().entryAt(k);
-	}
-
-	public IPersistentCollection cons(Object o) {
-		return getMap().cons(o);
-	}
-
-	public IPersistentCollection empty() {
-		return new IndifferentAccessMap(PersistentArrayMap.EMPTY);
-	}
-
-	public boolean equiv(Object o) {
-		return getMap().equiv(o);
-	}
-
-	public Iterator iterator() {
-		return getMap().iterator();
-	}
-
-	/* Map */
-	public boolean containsValue(Object v) {
-		return ((Map) getMap()).containsValue(v);
-	}
-
-	public Set entrySet() {
-		return ((Map) getMap()).entrySet();
-	}
-
-	public Object get(Object k) {
-		return valAt(k);
-	}
-
-	public boolean isEmpty() {
-		return ((Map) getMap()).isEmpty();
-	}
-
-	public Set keySet() {
-		return ((Map) getMap()).keySet();
-	}
-
-	public Collection values() {
-		return ((Map) getMap()).values();
-	}
-
-	/* Not implemented */
-	public void clear() {
-		throw new UnsupportedOperationException();
-	}
-
-	public Object put(Object k, Object v) {
-		throw new UnsupportedOperationException();
-	}
-
-	public void putAll(Map m) {
-		throw new UnsupportedOperationException();
-	}
-
-	public Object remove(Object k) {
-		throw new UnsupportedOperationException();
-	}
-}