You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:23 UTC

[44/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java b/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java
deleted file mode 100644
index 25a9b4c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java
+++ /dev/null
@@ -1,375 +0,0 @@
-package backtype.storm;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyAssignException;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Use this class to submit topologies to run on the Storm cluster. You should
- * run your program with the "storm jar" command from the command-line, and then
- * use this class to submit your topologies.
- */
-public class StormSubmitter {
-	public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
-
-	private static Nimbus.Iface localNimbus = null;
-
-	public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
-		StormSubmitter.localNimbus = localNimbusHandler;
-	}
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or
-	 * until explicitly killed.
-	 * 
-	 * 
-	 * @param name
-	 *            the name of the storm.
-	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 *            the processing to execute.
-	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
-	 */
-	public static void submitTopology(String name, Map stormConf,
-			StormTopology topology) throws AlreadyAliveException,
-			InvalidTopologyException {
-		submitTopology(name, stormConf, topology, null);
-	}
-
-	public static void submitTopology(String name, Map stormConf,
-			StormTopology topology, SubmitOptions opts, List<File> jarFiles)
-			throws AlreadyAliveException, InvalidTopologyException {
-		if (jarFiles == null) {
-			jarFiles = new ArrayList<File>();
-		}
-		Map<String, String> jars = new HashMap<String, String>(jarFiles.size());
-		List<String> names = new ArrayList<String>(jarFiles.size());
-		
-		for (File f : jarFiles) {
-			if (!f.exists()) {
-				LOG.info(f.getName() + " is not existed: "
-						+ f.getAbsolutePath());
-				continue;
-			}
-			jars.put(f.getName(), f.getAbsolutePath());
-			names.add(f.getName());
-		}
-		LOG.info("Files: " + names + " will be loaded");
-		stormConf.put(GenericOptionsParser.TOPOLOGY_LIB_PATH, jars);
-		stormConf.put(GenericOptionsParser.TOPOLOGY_LIB_NAME, names);
-		submitTopology(name, stormConf, topology, opts);
-	}
-
-	public static void submitTopology(String name, Map stormConf,
-			StormTopology topology, SubmitOptions opts,
-			ProgressListener listener) throws AlreadyAliveException,
-			InvalidTopologyException {
-		submitTopology(name, stormConf, topology, opts);
-	}
-
-	/**
-	 * Submits a topology to run on the cluster. A topology runs forever or
-	 * until explicitly killed.
-	 * 
-	 * 
-	 * @param name
-	 *            the name of the storm.
-	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 *            the processing to execute.
-	 * @param options
-	 *            to manipulate the starting of the topology
-	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
-	 */
-	public static void submitTopology(String name, Map stormConf,
-			StormTopology topology, SubmitOptions opts)
-			throws AlreadyAliveException, InvalidTopologyException {
-		if (!Utils.isValidConf(stormConf)) {
-			throw new IllegalArgumentException(
-					"Storm conf is not valid. Must be json-serializable");
-		}
-		stormConf = new HashMap(stormConf);
-		stormConf.putAll(Utils.readCommandLineOpts());
-		Map conf = Utils.readStormConfig();
-		conf.putAll(stormConf);
-		putUserInfo(conf, stormConf);
-		try {
-			String serConf = Utils.to_json(stormConf);
-			if (localNimbus != null) {
-				LOG.info("Submitting topology " + name + " in local mode");
-				localNimbus.submitTopology(name, null, serConf, topology);
-			} else {
-				NimbusClient client = NimbusClient.getConfiguredClient(conf);
-				if (topologyNameExists(conf, name)) {
-					throw new RuntimeException("Topology with name `" + name
-							+ "` already exists on cluster");
-				}
-				
-				submitJar(conf);
-				try {
-					LOG.info("Submitting topology " + name
-							+ " in distributed mode with conf " + serConf);
-					if (opts != null) {
-						client.getClient().submitTopologyWithOpts(name, path,
-								serConf, topology, opts);
-					} else {
-						// this is for backwards compatibility
-						client.getClient().submitTopology(name, path, serConf,
-								topology);
-					}
-				} finally {
-					client.close();
-				}
-			}
-			LOG.info("Finished submitting topology: " + name);
-		} catch (InvalidTopologyException e) {
-			LOG.warn("Topology submission exception", e);
-			throw e;
-		} catch (AlreadyAliveException e) {
-			LOG.warn("Topology already alive exception", e);
-			throw e;
-		} catch (TopologyAssignException e) {
-			LOG.warn("Failed to assign " + e.get_msg(), e);
-			throw new RuntimeException(e);
-		} catch (TException e) {
-			LOG.warn("Failed to assign ", e);
-			throw new RuntimeException(e);
-		}
-	}
-
-	/**
-	 * Submits a topology to run on the cluster with a progress bar. A topology
-	 * runs forever or until explicitly killed.
-	 * 
-	 * 
-	 * @param name
-	 *            the name of the storm.
-	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 *            the processing to execute.
-	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
-	 * @throws TopologyAssignException
-	 */
-
-	public static void submitTopologyWithProgressBar(String name,
-			Map stormConf, StormTopology topology)
-			throws AlreadyAliveException, InvalidTopologyException {
-		submitTopologyWithProgressBar(name, stormConf, topology, null);
-	}
-
-	/**
-	 * Submits a topology to run on the cluster with a progress bar. A topology
-	 * runs forever or until explicitly killed.
-	 * 
-	 * 
-	 * @param name
-	 *            the name of the storm.
-	 * @param stormConf
-	 *            the topology-specific configuration. See {@link Config}.
-	 * @param topology
-	 *            the processing to execute.
-	 * @param opts
-	 *            to manipulate the starting of the topology
-	 * @throws AlreadyAliveException
-	 *             if a topology with this name is already running
-	 * @throws InvalidTopologyException
-	 *             if an invalid topology was submitted
-	 * @throws TopologyAssignException
-	 */
-
-	public static void submitTopologyWithProgressBar(String name,
-			Map stormConf, StormTopology topology, SubmitOptions opts)
-			throws AlreadyAliveException, InvalidTopologyException {
-
-		/**
-		 * remove progress bar in jstorm
-		 */
-		submitTopology(name, stormConf, topology, opts);
-	}
-
-	private static boolean topologyNameExists(Map conf, String name) {
-		NimbusClient client = NimbusClient.getConfiguredClient(conf);
-		try {
-			ClusterSummary summary = client.getClient().getClusterInfo();
-			for (TopologySummary s : summary.get_topologies()) {
-				if (s.get_name().equals(name)) {
-					return true;
-				}
-			}
-			return false;
-
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		} finally {
-			client.close();
-		}
-	}
-
-	private static String submittedJar = null;
-	private static String path = null;
-
-	private static void submitJar(Map conf) {
-		if (submittedJar == null) {
-			NimbusClient client = NimbusClient.getConfiguredClient(conf);
-			try {
-				LOG.info("Jar not uploaded to master yet. Submitting jar...");
-				String localJar = System.getProperty("storm.jar");
-				path = client.getClient().beginFileUpload();
-				String[] pathCache = path.split("/");
-				String uploadLocation = path + "/stormjar-"
-						+ pathCache[pathCache.length - 1] + ".jar";
-				List<String> lib = (List<String>) conf
-						.get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
-				Map<String, String> libPath = (Map<String, String>) conf
-						.get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
-				if (lib != null && lib.size() != 0) {
-					for (String libName : lib) {
-						String jarPath = path  + "/lib/" + libName;
-						client.getClient().beginLibUpload(jarPath);
-						submitJar(conf, libPath.get(libName), jarPath, client);
-					}
-					
-				} else {
-					if (localJar == null) {
-						// no lib,  no client jar
-						throw new RuntimeException("No client app jar, please upload it");
-					}
-				}
-				
-				if (localJar != null) {
-					submittedJar = submitJar(conf, localJar,
-							uploadLocation, client);
-				}else {
-					// no client jar, but with lib jar
-					client.getClient().finishFileUpload(uploadLocation);
-				}
-			} catch (Exception e) {
-				throw new RuntimeException(e);
-			} finally {
-				client.close();
-			}
-		} else {
-			LOG.info("Jar already uploaded to master. Not submitting jar.");
-		}
-	}
-
-	public static String submitJar(Map conf, String localJar,
-			String uploadLocation, NimbusClient client) {
-		if (localJar == null) {
-			throw new RuntimeException(
-					"Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
-		}
-
-		try {
-
-			LOG.info("Uploading topology jar " + localJar
-					+ " to assigned location: " + uploadLocation);
-			int bufferSize = 512 * 1024;
-			Object maxBufSizeObject = conf
-					.get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE);
-			if (maxBufSizeObject != null) {
-				bufferSize = Utils.getInt(maxBufSizeObject) / 2;
-			}
-
-			BufferFileInputStream is = new BufferFileInputStream(localJar,
-					bufferSize);
-			while (true) {
-				byte[] toSubmit = is.read();
-				if (toSubmit.length == 0)
-					break;
-				client.getClient().uploadChunk(uploadLocation,
-						ByteBuffer.wrap(toSubmit));
-			}
-			client.getClient().finishFileUpload(uploadLocation);
-			LOG.info("Successfully uploaded topology jar to assigned location: "
-					+ uploadLocation);
-			return uploadLocation;
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		} finally {
-
-		}
-	}
-
-	private static void putUserInfo(Map conf, Map stormConf) {
-		stormConf.put("user.group", conf.get("user.group"));
-		stormConf.put("user.name", conf.get("user.name"));
-		stormConf.put("user.password", conf.get("user.password"));
-	}
-
-	/**
-	 * Interface use to track progress of file upload
-	 */
-	public interface ProgressListener {
-		/**
-		 * called before file is uploaded
-		 * 
-		 * @param srcFile
-		 *            - jar file to be uploaded
-		 * @param targetFile
-		 *            - destination file
-		 * @param totalBytes
-		 *            - total number of bytes of the file
-		 */
-		public void onStart(String srcFile, String targetFile, long totalBytes);
-
-		/**
-		 * called whenever a chunk of bytes is uploaded
-		 * 
-		 * @param srcFile
-		 *            - jar file to be uploaded
-		 * @param targetFile
-		 *            - destination file
-		 * @param bytesUploaded
-		 *            - number of bytes transferred so far
-		 * @param totalBytes
-		 *            - total number of bytes of the file
-		 */
-		public void onProgress(String srcFile, String targetFile,
-				long bytesUploaded, long totalBytes);
-
-		/**
-		 * called when the file is uploaded
-		 * 
-		 * @param srcFile
-		 *            - jar file to be uploaded
-		 * @param targetFile
-		 *            - destination file
-		 * @param totalBytes
-		 *            - total number of bytes of the file
-		 */
-		public void onCompleted(String srcFile, String targetFile,
-				long totalBytes);
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/Tool.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/Tool.java b/jstorm-client/src/main/java/backtype/storm/Tool.java
deleted file mode 100644
index 5fa7ccc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/Tool.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package backtype.storm;
-
-/**
- * A tool abstract class that supports handling of generic
- * command-line options.
- *
- * <p>Here is how a typical <code>Tool</code> is implemented:</p>
- * <p><blockquote><pre>
- *     public class TopologyApp extends Tool {
- *         {@literal @}Override
- *         public int run(String[] args) throws Exception {
- *             // Config processed by ToolRunner
- *             Config conf = getConf();
- *
- *             // Other setups go here
- *             String name = "topology";
- *             StormTopology topology = buildTopology(args);
- *             StormSubmitter.submitTopology(name, conf, topology);
- *             return 0;
- *         }
- *
- *         StormTopology buildTopology(String[] args) { ... }
- *
- *         public static void main(String[] args) throws Exception {
- *             // Use ToolRunner to handle generic command-line options
- *             ToolRunner.run(new TopologyApp(), args);
- *         }
- *     }
- * </pre></blockquote></p>
- *
- * @see GenericOptionsParser
- * @see ToolRunner
- */
-
-public abstract class Tool {
-    Config config;
-
-    public abstract int run(String[] args) throws Exception;
-
-    public Config getConf() {
-        return config;
-    }
-
-    public void setConf(Config config) {
-        this.config = config;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/ToolRunner.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/ToolRunner.java b/jstorm-client/src/main/java/backtype/storm/ToolRunner.java
deleted file mode 100644
index 30940da..0000000
--- a/jstorm-client/src/main/java/backtype/storm/ToolRunner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package backtype.storm;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.cli.ParseException;
-
-import backtype.storm.utils.Utils;
-
-/**
- * A utility to help run {@link Tool}s
- *
- * <p><code>ToolRunner</code> can be used to run classes extending the
- * <code>Tool</code> abstract class.  It works in conjunction with
- * {@link GenericOptionsParser} to parse the <a
- * href="{@docRoot}/backtype/storm/GenericOptionsParser.html#GenericOptions">
- * generic storm command line arguments</a> and modifies the
- * <code>Config</code> of the <code>Tool</code>.  The
- * application-specific options are passed along without being
- * modified.
- *
- * @see Tool
- * @see GenericOptionsParser
- */
-
-public class ToolRunner {
-    static final Logger LOG = LoggerFactory.getLogger(ToolRunner.class);
-
-    public static void run(Tool tool, String[] args) {
-        run(tool.getConf(), tool, args);
-    }
-
-    public static void run(Config conf, Tool tool, String[] args) {
-        try {
-            if (conf == null) {
-                conf = new Config();
-                conf.putAll(Utils.readStormConfig());
-            }
-
-            GenericOptionsParser parser = new GenericOptionsParser(conf, args);
-            tool.setConf(conf);
-
-            System.exit(tool.run(parser.getRemainingArgs()));
-        } catch (ParseException e) {
-            LOG.error("Error parsing generic options: {}", e.getMessage());
-            GenericOptionsParser.printGenericCommandUsage(System.err);
-            System.exit(2);
-        } catch (Exception e) {
-            LOG.error("Error running tool", e);
-            System.exit(1);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java b/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java
deleted file mode 100644
index bd3873f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ClojureBolt implements IRichBolt, FinishedCallback {
-	Map<String, StreamInfo> _fields;
-	List<String> _fnSpec;
-	List<String> _confSpec;
-	List<Object> _params;
-
-	IBolt _bolt;
-
-	public ClojureBolt(List fnSpec, List confSpec, List<Object> params,
-			Map<String, StreamInfo> fields) {
-		_fnSpec = fnSpec;
-		_confSpec = confSpec;
-		_params = params;
-		_fields = fields;
-	}
-
-	@Override
-	public void prepare(final Map stormConf, final TopologyContext context,
-			final OutputCollector collector) {
-		IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-		try {
-			IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-			final Map<Keyword, Object> collectorMap = new PersistentArrayMap(
-					new Object[] {
-							Keyword.intern(Symbol.create("output-collector")),
-							collector,
-							Keyword.intern(Symbol.create("context")), context });
-			List<Object> args = new ArrayList<Object>() {
-				{
-					add(stormConf);
-					add(context);
-					add(collectorMap);
-				}
-			};
-
-			_bolt = (IBolt) preparer.applyTo(RT.seq(args));
-			// this is kind of unnecessary for clojure
-			try {
-				_bolt.prepare(stormConf, context, collector);
-			} catch (AbstractMethodError ame) {
-
-			}
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		_bolt.execute(input);
-	}
-
-	@Override
-	public void cleanup() {
-		try {
-			_bolt.cleanup();
-		} catch (AbstractMethodError ame) {
-
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		for (String stream : _fields.keySet()) {
-			StreamInfo info = _fields.get(stream);
-			declarer.declareStream(stream, info.is_direct(),
-					new Fields(info.get_output_fields()));
-		}
-	}
-
-	@Override
-	public void finishedId(Object id) {
-		if (_bolt instanceof FinishedCallback) {
-			((FinishedCallback) _bolt).finishedId(id);
-		}
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-		try {
-			return (Map) hof.applyTo(RT.seq(_params));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java b/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java
deleted file mode 100644
index 3606252..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.spout.ISpout;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ClojureSpout implements IRichSpout {
-	Map<String, StreamInfo> _fields;
-	List<String> _fnSpec;
-	List<String> _confSpec;
-	List<Object> _params;
-
-	ISpout _spout;
-
-	public ClojureSpout(List fnSpec, List confSpec, List<Object> params,
-			Map<String, StreamInfo> fields) {
-		_fnSpec = fnSpec;
-		_confSpec = confSpec;
-		_params = params;
-		_fields = fields;
-	}
-
-	@Override
-	public void open(final Map conf, final TopologyContext context,
-			final SpoutOutputCollector collector) {
-		IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
-		try {
-			IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
-			final Map<Keyword, Object> collectorMap = new PersistentArrayMap(
-					new Object[] {
-							Keyword.intern(Symbol.create("output-collector")),
-							collector,
-							Keyword.intern(Symbol.create("context")), context });
-			List<Object> args = new ArrayList<Object>() {
-				{
-					add(conf);
-					add(context);
-					add(collectorMap);
-				}
-			};
-
-			_spout = (ISpout) preparer.applyTo(RT.seq(args));
-			// this is kind of unnecessary for clojure
-			try {
-				_spout.open(conf, context, collector);
-			} catch (AbstractMethodError ame) {
-
-			}
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void close() {
-		try {
-			_spout.close();
-		} catch (AbstractMethodError ame) {
-
-		}
-	}
-
-	@Override
-	public void nextTuple() {
-		try {
-			_spout.nextTuple();
-		} catch (AbstractMethodError ame) {
-
-		}
-
-	}
-
-	@Override
-	public void ack(Object msgId) {
-		try {
-			_spout.ack(msgId);
-		} catch (AbstractMethodError ame) {
-
-		}
-
-	}
-
-	@Override
-	public void fail(Object msgId) {
-		try {
-			_spout.fail(msgId);
-		} catch (AbstractMethodError ame) {
-
-		}
-
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		for (String stream : _fields.keySet()) {
-			StreamInfo info = _fields.get(stream);
-			declarer.declareStream(stream, info.is_direct(),
-					new Fields(info.get_output_fields()));
-		}
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
-		try {
-			return (Map) hof.applyTo(RT.seq(_params));
-		} catch (Exception e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	@Override
-	public void activate() {
-		try {
-			_spout.activate();
-		} catch (AbstractMethodError ame) {
-
-		}
-	}
-
-	@Override
-	public void deactivate() {
-		try {
-			_spout.deactivate();
-		} catch (AbstractMethodError ame) {
-
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java b/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java
deleted file mode 100644
index 9289448..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellBolt extends ShellBolt implements IRichBolt {
-	private Map<String, StreamInfo> _outputs;
-
-	public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
-		super(command);
-		_outputs = outputs;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		for (String stream : _outputs.keySet()) {
-			StreamInfo def = _outputs.get(stream);
-			if (def.is_direct()) {
-				declarer.declareStream(stream, true,
-						new Fields(def.get_output_fields()));
-			} else {
-				declarer.declareStream(stream,
-						new Fields(def.get_output_fields()));
-			}
-		}
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java b/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java
deleted file mode 100644
index 98763a5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.spout.ShellSpout;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellSpout extends ShellSpout implements IRichSpout {
-	private Map<String, StreamInfo> _outputs;
-
-	public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
-		super(command);
-		_outputs = outputs;
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		for (String stream : _outputs.keySet()) {
-			StreamInfo def = _outputs.get(stream);
-			if (def.is_direct()) {
-				declarer.declareStream(stream, true,
-						new Fields(def.get_output_fields()));
-			} else {
-				declarer.declareStream(stream,
-						new Fields(def.get_output_fields()));
-			}
-		}
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/activate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/activate.java b/jstorm-client/src/main/java/backtype/storm/command/activate.java
deleted file mode 100644
index ed8c33a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/activate.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Activate topology
- * 
- * @author longda
- * 
- */
-public class activate {
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
-		if (args == null || args.length == 0) {
-			throw new InvalidParameterException("Should input topology name");
-		}
-
-		String topologyName = args[0];
-
-		NimbusClient client = null;
-		try {
-
-			Map conf = Utils.readStormConfig();
-			client = NimbusClient.getConfiguredClient(conf);
-
-			client.getClient().activate(topologyName);
-
-			System.out.println("Successfully submit command activate "
-					+ topologyName);
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/config_value.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/config_value.java b/jstorm-client/src/main/java/backtype/storm/command/config_value.java
deleted file mode 100644
index 90a3ed0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/config_value.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Get configuration 
- * 
- * @author longda
- * 
- */
-public class config_value {
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
-		if (args == null || args.length == 0) {
-			throw new InvalidParameterException("Should input key name");
-		}
-
-		String key = args[0];
-
-		Map conf = Utils.readStormConfig();
-
-		System.out.print("VALUE: " + String.valueOf(conf.get(key)));
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/deactivate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/deactivate.java b/jstorm-client/src/main/java/backtype/storm/command/deactivate.java
deleted file mode 100644
index 845f456..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/deactivate.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Deactivate topology
- * 
- * @author longda
- * 
- */
-public class deactivate {
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
-		if (args == null || args.length == 0) {
-			throw new InvalidParameterException("Should input topology name");
-		}
-
-		String topologyName = args[0];
-
-		NimbusClient client = null;
-		try {
-
-			Map conf = Utils.readStormConfig();
-			client = NimbusClient.getConfiguredClient(conf);
-
-			client.getClient().deactivate(topologyName);
-
-			System.out.println("Successfully submit command deactivate "
-					+ topologyName);
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java b/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java
deleted file mode 100644
index 0c950cb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.generated.KillOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Kill topology
- * 
- * @author longda
- * 
- */
-public class kill_topology {
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
-		if (args == null || args.length == 0) {
-			throw new InvalidParameterException("Should input topology name");
-		}
-
-		String topologyName = args[0];
-
-		NimbusClient client = null;
-		try {
-
-			Map conf = Utils.readStormConfig();
-			client = NimbusClient.getConfiguredClient(conf);
-
-			if (args.length == 1) {
-
-				client.getClient().killTopology(topologyName);
-			} else {
-				int delaySeconds = Integer.parseInt(args[1]);
-
-				KillOptions options = new KillOptions();
-				options.set_wait_secs(delaySeconds);
-
-				client.getClient().killTopologyWithOpts(topologyName, options);
-
-			}
-
-			System.out.println("Successfully submit command kill "
-					+ topologyName);
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/list.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/list.java b/jstorm-client/src/main/java/backtype/storm/command/list.java
deleted file mode 100644
index b0e40bc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/list.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.command;
-
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Activate topology
- * 
- * @author longda
- * 
- */
-public class list {
-	
-	
-	
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-
-		NimbusClient client = null;
-		try {
-
-			Map conf = Utils.readStormConfig();
-			client = NimbusClient.getConfiguredClient(conf);
-			
-			if (args.length > 0 && StringUtils.isBlank(args[0]) == false) {
-				String topologyName = args[0];
-				TopologyInfo info = client.getClient().getTopologyInfoByName(topologyName);
-				
-				System.out.println("Successfully get topology info \n"
-						+ Utils.toPrettyJsonString(info));
-			}else {
-				ClusterSummary clusterSummary = client.getClient().getClusterInfo();
-				
-				System.out.println("Successfully get cluster info \n"
-						+ Utils.toPrettyJsonString(clusterSummary));
-			}
-
-			
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java b/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java
deleted file mode 100644
index bb339d4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package backtype.storm.command;
-
-import java.util.Map;
-import java.security.InvalidParameterException;
-
-import backtype.storm.generated.MonitorOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Monitor topology
- * 
- * @author Basti
- * 
- */
-public class metrics_monitor {
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
-		if (args == null || args.length <= 1) {
-			throw new InvalidParameterException("Should input topology name and enable flag");
-		}
-
-		String topologyName = args[0];
-
-		NimbusClient client = null;
-		try {
-
-			Map conf = Utils.readStormConfig();
-			client = NimbusClient.getConfiguredClient(conf);
-
-			boolean isEnable = Boolean.valueOf(args[1]).booleanValue();
-
-			MonitorOptions options = new MonitorOptions();
-			options.set_isEnable(isEnable);
-
-			client.getClient().metricMonitor(topologyName, options);
-
-			String str = (isEnable) ? "enable" : "disable";
-			System.out.println("Successfully submit command to " + str
-					+ " the monitor of " + topologyName);
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/rebalance.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/rebalance.java b/jstorm-client/src/main/java/backtype/storm/command/rebalance.java
deleted file mode 100644
index 72c3f26..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/rebalance.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Active topology
- * 
- * @author longda
- * 
- */
-public class rebalance {
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
-		if (args == null || args.length == 0) {
-			throw new InvalidParameterException("Should input topology name");
-		}
-
-		String topologyName = args[0];
-
-		NimbusClient client = null;
-		try {
-
-			Map conf = Utils.readStormConfig();
-			client = NimbusClient.getConfiguredClient(conf);
-
-			if (args.length == 1) {
-
-				client.getClient().rebalance(topologyName, null);
-			} else {
-				int delaySeconds = Integer.parseInt(args[1]);
-
-				RebalanceOptions options = new RebalanceOptions();
-				options.set_wait_secs(delaySeconds);
-
-				client.getClient().rebalance(topologyName, options);
-			}
-
-			System.out.println("Successfully submit command rebalance "
-					+ topologyName);
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/restart.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/restart.java b/jstorm-client/src/main/java/backtype/storm/command/restart.java
deleted file mode 100644
index 1eb577d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/restart.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package backtype.storm.command;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.security.InvalidParameterException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.yaml.snakeyaml.Yaml;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Active topology
- * 
- * @author basti
- * 
- */
-public class restart {	
-	private static Map LoadProperty(String prop) {
-		Map ret = new HashMap<Object, Object>();
-		Properties properties = new Properties();
-
-		try {
-			InputStream stream = new FileInputStream(prop);
-			properties.load(stream);
-			if (properties.size() == 0) {
-				System.out.println("WARN: Config file is empty");
-				return null;
-			} else {
-				ret.putAll(properties);
-			}
-		} catch (FileNotFoundException e) {
-			System.out.println("No such file " + prop);
-			throw new RuntimeException(e.getMessage());
-		} catch (Exception e1) {
-			e1.printStackTrace();
-			throw new RuntimeException(e1.getMessage());
-		}
-		
-		return ret;
-	}
-
-	private static Map LoadYaml(String confPath) {
-		Map ret = new HashMap<Object, Object>();
-		Yaml yaml = new Yaml();
-
-		try {
-			InputStream stream = new FileInputStream(confPath);
-			ret = (Map) yaml.load(stream);
-			if (ret == null || ret.isEmpty() == true) {
-				System.out.println("WARN: Config file is empty");
-				return null;
-			}
-		} catch (FileNotFoundException e) {
-			System.out.println("No such file " + confPath);
-			throw new RuntimeException("No config file");
-		} catch (Exception e1) {
-			e1.printStackTrace();
-			throw new RuntimeException("Failed to read config file");
-		}
-
-		return ret;
-	}
-	
-	private static Map LoadConf(String arg) {
-		Map ret = null;
-		if (arg.endsWith("yaml")) {
-			ret = LoadYaml(arg);
-		} else {
-			ret = LoadProperty(arg);
-		}
-		return ret;
-	}
-
-	/**
-	 * @param args
-	 */
-	public static void main(String[] args) {
-		// TODO Auto-generated method stub
-		if (args == null || args.length == 0) {
-			throw new InvalidParameterException("Should input topology name");
-		}
-
-		String topologyName = args[0];
-
-		NimbusClient client = null;
-		try {
-			Map conf = Utils.readStormConfig();
-			client = NimbusClient.getConfiguredClient(conf);
-			
-			System.out.println("It will take 15 ~ 100 seconds to restart, please wait patiently\n");
-
-			if (args.length == 1) {
-				client.getClient().restart(topologyName, null);
-			} else {
-				Map loadConf = LoadConf(args[1]);
-				String jsonConf = Utils.to_json(loadConf);
-				System.out.println("New configuration:\n" + jsonConf);
-				
-				client.getClient().restart(topologyName, jsonConf);
-			}
-
-			System.out.println("Successfully submit command restart "
-					+ topologyName);
-		} catch (Exception e) {
-			System.out.println(e.getMessage());
-			e.printStackTrace();
-			throw new RuntimeException(e);
-		} finally {
-			if (client != null) {
-				client.close();
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
deleted file mode 100644
index 5502975..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BatchBoltExecutor implements IRichBolt, FinishedCallback,
-		TimeoutCallback {
-	public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
-
-	byte[] _boltSer;
-	Map<Object, IBatchBolt> _openTransactions;
-	Map _conf;
-	TopologyContext _context;
-	BatchOutputCollectorImpl _collector;
-
-	public BatchBoltExecutor(IBatchBolt bolt) {
-		_boltSer = Utils.serialize(bolt);
-	}
-
-	@Override
-	public void prepare(Map conf, TopologyContext context,
-			OutputCollector collector) {
-		_conf = conf;
-		_context = context;
-		_collector = new BatchOutputCollectorImpl(collector);
-		_openTransactions = new HashMap<Object, IBatchBolt>();
-	}
-
-	@Override
-	public void execute(Tuple input) {
-		Object id = input.getValue(0);
-		IBatchBolt bolt = getBatchBolt(id);
-		try {
-			bolt.execute(input);
-			_collector.ack(input);
-		} catch (FailedException e) {
-			LOG.error("Failed to process tuple in batch", e);
-			_collector.fail(input);
-		}
-	}
-
-	@Override
-	public void cleanup() {
-	}
-
-	@Override
-	public void finishedId(Object id) {
-		IBatchBolt bolt = getBatchBolt(id);
-		_openTransactions.remove(id);
-		bolt.finishBatch();
-	}
-
-	@Override
-	public void timeoutId(Object attempt) {
-		_openTransactions.remove(attempt);
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		newTransactionalBolt().declareOutputFields(declarer);
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return newTransactionalBolt().getComponentConfiguration();
-	}
-
-	private IBatchBolt getBatchBolt(Object id) {
-		IBatchBolt bolt = _openTransactions.get(id);
-		if (bolt == null) {
-			bolt = newTransactionalBolt();
-			bolt.prepare(_conf, _context, _collector, id);
-			_openTransactions.put(id, bolt);
-		}
-		return bolt;
-	}
-
-	private IBatchBolt newTransactionalBolt() {
-		return (IBatchBolt) Utils.deserialize(_boltSer);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
deleted file mode 100644
index e087f5f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-public abstract class BatchOutputCollector {
-
-	/**
-	 * Emits a tuple to the default output stream.
-	 */
-	public List<Integer> emit(List<Object> tuple) {
-		return emit(Utils.DEFAULT_STREAM_ID, tuple);
-	}
-
-	public abstract List<Integer> emit(String streamId, List<Object> tuple);
-
-	/**
-	 * Emits a tuple to the specified task on the default output stream. This
-	 * output stream must have been declared as a direct stream, and the
-	 * specified task must use a direct grouping on this stream to receive the
-	 * message.
-	 */
-	public void emitDirect(int taskId, List<Object> tuple) {
-		emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
-	}
-
-	public abstract void emitDirect(int taskId, String streamId,
-			List<Object> tuple);
-
-	public abstract void reportError(Throwable error);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
deleted file mode 100644
index 73eff73..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
-import java.util.List;
-
-public class BatchOutputCollectorImpl extends BatchOutputCollector {
-	OutputCollector _collector;
-
-	public BatchOutputCollectorImpl(OutputCollector collector) {
-		_collector = collector;
-	}
-
-	@Override
-	public List<Integer> emit(String streamId, List<Object> tuple) {
-		return _collector.emit(streamId, tuple);
-	}
-
-	@Override
-	public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-		_collector.emitDirect(taskId, streamId, tuple);
-	}
-
-	@Override
-	public void reportError(Throwable error) {
-		_collector.reportError(error);
-	}
-
-	public void ack(Tuple tup) {
-		_collector.ack(tup);
-	}
-
-	public void fail(Tuple tup) {
-		_collector.fail(tup);
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
deleted file mode 100644
index eff05b4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
+++ /dev/null
@@ -1,469 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.Constants;
-import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-public class BatchSubtopologyBuilder {
-	Map<String, Component> _bolts = new HashMap<String, Component>();
-	Component _masterBolt;
-	String _masterId;
-
-	public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt,
-			Number boltParallelism) {
-		Integer p = boltParallelism == null ? null : boltParallelism.intValue();
-		_masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
-		_masterId = masterBoltId;
-	}
-
-	public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
-		this(masterBoltId, masterBolt, null);
-	}
-
-	public BoltDeclarer getMasterDeclarer() {
-		return new BoltDeclarerImpl(_masterBolt);
-	}
-
-	public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
-		return setBolt(id, bolt, null);
-	}
-
-	public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
-		return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
-	}
-
-	public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
-		return setBolt(id, bolt, null);
-	}
-
-	public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
-		return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
-	}
-
-	private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
-		Integer p = null;
-		if (parallelism != null)
-			p = parallelism.intValue();
-		Component component = new Component(bolt, p);
-		_bolts.put(id, component);
-		return new BoltDeclarerImpl(component);
-	}
-
-	public void extendTopology(TopologyBuilder builder) {
-		BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(
-				_masterBolt.bolt), _masterBolt.parallelism);
-		for (InputDeclaration decl : _masterBolt.declarations) {
-			decl.declare(declarer);
-		}
-		for (Map conf : _masterBolt.componentConfs) {
-			declarer.addConfigurations(conf);
-		}
-		for (String id : _bolts.keySet()) {
-			Component component = _bolts.get(id);
-			Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
-			for (String c : componentBoltSubscriptions(component)) {
-				SourceArgs source;
-				if (c.equals(_masterId)) {
-					source = SourceArgs.single();
-				} else {
-					source = SourceArgs.all();
-				}
-				coordinatedArgs.put(c, source);
-			}
-
-			BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(
-					component.bolt, coordinatedArgs, null),
-					component.parallelism);
-			for (Map conf : component.componentConfs) {
-				input.addConfigurations(conf);
-			}
-			for (String c : componentBoltSubscriptions(component)) {
-				input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
-			}
-			for (InputDeclaration d : component.declarations) {
-				d.declare(input);
-			}
-		}
-	}
-
-	private Set<String> componentBoltSubscriptions(Component component) {
-		Set<String> ret = new HashSet<String>();
-		for (InputDeclaration d : component.declarations) {
-			ret.add(d.getComponent());
-		}
-		return ret;
-	}
-
-	private static class Component {
-		public IRichBolt bolt;
-		public Integer parallelism;
-		public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-		public List<Map> componentConfs = new ArrayList<Map>();
-
-		public Component(IRichBolt bolt, Integer parallelism) {
-			this.bolt = bolt;
-			this.parallelism = parallelism;
-		}
-	}
-
-	private static interface InputDeclaration {
-		void declare(InputDeclarer declarer);
-
-		String getComponent();
-	}
-
-	private class BoltDeclarerImpl extends
-			BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
-		Component _component;
-
-		public BoltDeclarerImpl(Component component) {
-			_component = component;
-		}
-
-		@Override
-		public BoltDeclarer fieldsGrouping(final String component,
-				final Fields fields) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.fieldsGrouping(component, fields);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer fieldsGrouping(final String component,
-				final String streamId, final Fields fields) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.fieldsGrouping(component, streamId, fields);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer globalGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.globalGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer globalGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.globalGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer shuffleGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.shuffleGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer shuffleGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.shuffleGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer localOrShuffleGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localOrShuffleGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer localOrShuffleGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localOrShuffleGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-		
-		@Override
-		public BoltDeclarer localFirstGrouping(final String componentId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localFirstGrouping(componentId);
-				}
-
-				@Override
-				public String getComponent() {
-					return componentId;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer localFirstGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.localFirstGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer noneGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.noneGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer noneGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.noneGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer allGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.allGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer allGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.allGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer directGrouping(final String component) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.directGrouping(component);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer directGrouping(final String component,
-				final String streamId) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.directGrouping(component, streamId);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer customGrouping(final String component,
-				final CustomStreamGrouping grouping) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.customGrouping(component, grouping);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer customGrouping(final String component,
-				final String streamId, final CustomStreamGrouping grouping) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.customGrouping(component, streamId, grouping);
-				}
-
-				@Override
-				public String getComponent() {
-					return component;
-				}
-			});
-			return this;
-		}
-
-		@Override
-		public BoltDeclarer grouping(final GlobalStreamId stream,
-				final Grouping grouping) {
-			addDeclaration(new InputDeclaration() {
-				@Override
-				public void declare(InputDeclarer declarer) {
-					declarer.grouping(stream, grouping);
-				}
-
-				@Override
-				public String getComponent() {
-					return stream.get_componentId();
-				}
-			});
-			return this;
-		}
-
-		private void addDeclaration(InputDeclaration declaration) {
-			_component.declarations.add(declaration);
-		}
-
-		@Override
-		public BoltDeclarer addConfigurations(Map conf) {
-			_component.componentConfs.add(conf);
-			return this;
-		}
-
-		
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java b/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
deleted file mode 100644
index 1e78932..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
+++ /dev/null
@@ -1,379 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.topology.FailedException;
-import java.util.Map.Entry;
-import backtype.storm.tuple.Values;
-import backtype.storm.generated.GlobalStreamId;
-import java.util.Collection;
-import backtype.storm.Constants;
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-import backtype.storm.utils.Utils;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.get;
-
-/**
- * Coordination requires the request ids to be globally unique for awhile. This
- * is so it doesn't get confused in the case of retries.
- */
-public class CoordinatedBolt implements IRichBolt {
-	public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
-
-	public static interface FinishedCallback {
-		void finishedId(Object id);
-	}
-
-	public static interface TimeoutCallback {
-		void timeoutId(Object id);
-	}
-
-	public static class SourceArgs implements Serializable {
-		public boolean singleCount;
-
-		protected SourceArgs(boolean singleCount) {
-			this.singleCount = singleCount;
-		}
-
-		public static SourceArgs single() {
-			return new SourceArgs(true);
-		}
-
-		public static SourceArgs all() {
-			return new SourceArgs(false);
-		}
-
-		@Override
-		public String toString() {
-			return "<Single: " + singleCount + ">";
-		}
-	}
-
-	public class CoordinatedOutputCollector implements IOutputCollector {
-		IOutputCollector _delegate;
-
-		public CoordinatedOutputCollector(IOutputCollector delegate) {
-			_delegate = delegate;
-		}
-
-		public List<Integer> emit(String stream, Collection<Tuple> anchors,
-				List<Object> tuple) {
-			List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
-			updateTaskCounts(tuple.get(0), tasks);
-			return tasks;
-		}
-
-		public void emitDirect(int task, String stream,
-				Collection<Tuple> anchors, List<Object> tuple) {
-			updateTaskCounts(tuple.get(0), Arrays.asList(task));
-			_delegate.emitDirect(task, stream, anchors, tuple);
-		}
-
-		public void ack(Tuple tuple) {
-			Object id = tuple.getValue(0);
-			synchronized (_tracked) {
-				TrackingInfo track = _tracked.get(id);
-				if (track != null)
-					track.receivedTuples++;
-			}
-			boolean failed = checkFinishId(tuple, TupleType.REGULAR);
-			if (failed) {
-				_delegate.fail(tuple);
-			} else {
-				_delegate.ack(tuple);
-			}
-		}
-
-		public void fail(Tuple tuple) {
-			Object id = tuple.getValue(0);
-			synchronized (_tracked) {
-				TrackingInfo track = _tracked.get(id);
-				if (track != null)
-					track.failed = true;
-			}
-			checkFinishId(tuple, TupleType.REGULAR);
-			_delegate.fail(tuple);
-		}
-
-		public void reportError(Throwable error) {
-			_delegate.reportError(error);
-		}
-
-		private void updateTaskCounts(Object id, List<Integer> tasks) {
-			synchronized (_tracked) {
-				TrackingInfo track = _tracked.get(id);
-				if (track != null) {
-					Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
-					for (Integer task : tasks) {
-						int newCount = get(taskEmittedTuples, task, 0) + 1;
-						taskEmittedTuples.put(task, newCount);
-					}
-				}
-			}
-		}
-	}
-
-	private Map<String, SourceArgs> _sourceArgs;
-	private IdStreamSpec _idStreamSpec;
-	private IRichBolt _delegate;
-	private Integer _numSourceReports;
-	private List<Integer> _countOutTasks = new ArrayList<Integer>();;
-	private OutputCollector _collector;
-	private TimeCacheMap<Object, TrackingInfo> _tracked;
-
-	public static class TrackingInfo {
-		int reportCount = 0;
-		int expectedTupleCount = 0;
-		int receivedTuples = 0;
-		boolean failed = false;
-		Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
-		boolean receivedId = false;
-		boolean finished = false;
-		List<Tuple> ackTuples = new ArrayList<Tuple>();
-
-		@Override
-		public String toString() {
-			return "reportCount: " + reportCount + "\n"
-					+ "expectedTupleCount: " + expectedTupleCount + "\n"
-					+ "receivedTuples: " + receivedTuples + "\n" + "failed: "
-					+ failed + "\n" + taskEmittedTuples.toString();
-		}
-	}
-
-	public static class IdStreamSpec implements Serializable {
-		GlobalStreamId _id;
-
-		public GlobalStreamId getGlobalStreamId() {
-			return _id;
-		}
-
-		public static IdStreamSpec makeDetectSpec(String component,
-				String stream) {
-			return new IdStreamSpec(component, stream);
-		}
-
-		protected IdStreamSpec(String component, String stream) {
-			_id = new GlobalStreamId(component, stream);
-		}
-	}
-
-	public CoordinatedBolt(IRichBolt delegate) {
-		this(delegate, null, null);
-	}
-
-	public CoordinatedBolt(IRichBolt delegate, String sourceComponent,
-			SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
-		this(delegate, singleSourceArgs(sourceComponent, sourceArgs),
-				idStreamSpec);
-	}
-
-	public CoordinatedBolt(IRichBolt delegate,
-			Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
-		_sourceArgs = sourceArgs;
-		if (_sourceArgs == null)
-			_sourceArgs = new HashMap<String, SourceArgs>();
-		_delegate = delegate;
-		_idStreamSpec = idStreamSpec;
-	}
-
-	public void prepare(Map config, TopologyContext context,
-			OutputCollector collector) {
-		TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
-		if (_delegate instanceof TimeoutCallback) {
-			callback = new TimeoutItems();
-		}
-		_tracked = new TimeCacheMap<Object, TrackingInfo>(
-				context.maxTopologyMessageTimeout(), callback);
-		_collector = collector;
-		_delegate.prepare(config, context, new OutputCollector(
-				new CoordinatedOutputCollector(collector)));
-		for (String component : Utils.get(context.getThisTargets(),
-				Constants.COORDINATED_STREAM_ID,
-				new HashMap<String, Grouping>()).keySet()) {
-			for (Integer task : context.getComponentTasks(component)) {
-				_countOutTasks.add(task);
-			}
-		}
-		if (!_sourceArgs.isEmpty()) {
-			_numSourceReports = 0;
-			for (Entry<String, SourceArgs> entry : _sourceArgs.entrySet()) {
-				if (entry.getValue().singleCount) {
-					_numSourceReports += 1;
-				} else {
-					_numSourceReports += context.getComponentTasks(
-							entry.getKey()).size();
-				}
-			}
-		}
-	}
-
-	private boolean checkFinishId(Tuple tup, TupleType type) {
-		Object id = tup.getValue(0);
-		boolean failed = false;
-
-		synchronized (_tracked) {
-			TrackingInfo track = _tracked.get(id);
-			try {
-				if (track != null) {
-					boolean delayed = false;
-					if (_idStreamSpec == null && type == TupleType.COORD
-							|| _idStreamSpec != null && type == TupleType.ID) {
-						track.ackTuples.add(tup);
-						delayed = true;
-					}
-					if (track.failed) {
-						failed = true;
-						for (Tuple t : track.ackTuples) {
-							_collector.fail(t);
-						}
-						_tracked.remove(id);
-					} else if (track.receivedId
-							&& (_sourceArgs.isEmpty() || track.reportCount == _numSourceReports
-									&& track.expectedTupleCount == track.receivedTuples)) {
-						if (_delegate instanceof FinishedCallback) {
-							((FinishedCallback) _delegate).finishedId(id);
-						}
-						if (!(_sourceArgs.isEmpty() || type != TupleType.REGULAR)) {
-							throw new IllegalStateException(
-									"Coordination condition met on a non-coordinating tuple. Should be impossible");
-						}
-						Iterator<Integer> outTasks = _countOutTasks.iterator();
-						while (outTasks.hasNext()) {
-							int task = outTasks.next();
-							int numTuples = get(track.taskEmittedTuples, task,
-									0);
-							_collector.emitDirect(task,
-									Constants.COORDINATED_STREAM_ID, tup,
-									new Values(id, numTuples));
-						}
-						for (Tuple t : track.ackTuples) {
-							_collector.ack(t);
-						}
-						track.finished = true;
-						_tracked.remove(id);
-					}
-					if (!delayed && type != TupleType.REGULAR) {
-						if (track.failed) {
-							_collector.fail(tup);
-						} else {
-							_collector.ack(tup);
-						}
-					}
-				} else {
-					if (type != TupleType.REGULAR)
-						_collector.fail(tup);
-				}
-			} catch (FailedException e) {
-				LOG.error("Failed to finish batch", e);
-				for (Tuple t : track.ackTuples) {
-					_collector.fail(t);
-				}
-				_tracked.remove(id);
-				failed = true;
-			}
-		}
-		return failed;
-	}
-
-	public void execute(Tuple tuple) {
-		Object id = tuple.getValue(0);
-		TrackingInfo track;
-		TupleType type = getTupleType(tuple);
-		synchronized (_tracked) {
-			track = _tracked.get(id);
-			if (track == null) {
-				track = new TrackingInfo();
-				if (_idStreamSpec == null)
-					track.receivedId = true;
-				_tracked.put(id, track);
-			}
-		}
-
-		if (type == TupleType.ID) {
-			synchronized (_tracked) {
-				track.receivedId = true;
-			}
-			checkFinishId(tuple, type);
-		} else if (type == TupleType.COORD) {
-			int count = (Integer) tuple.getValue(1);
-			synchronized (_tracked) {
-				track.reportCount++;
-				track.expectedTupleCount += count;
-			}
-			checkFinishId(tuple, type);
-		} else {
-			synchronized (_tracked) {
-				_delegate.execute(tuple);
-			}
-		}
-	}
-
-	public void cleanup() {
-		_delegate.cleanup();
-		_tracked.cleanup();
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_delegate.declareOutputFields(declarer);
-		declarer.declareStream(Constants.COORDINATED_STREAM_ID, true,
-				new Fields("id", "count"));
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return _delegate.getComponentConfiguration();
-	}
-
-	private static Map<String, SourceArgs> singleSourceArgs(
-			String sourceComponent, SourceArgs sourceArgs) {
-		Map<String, SourceArgs> ret = new HashMap<String, SourceArgs>();
-		ret.put(sourceComponent, sourceArgs);
-		return ret;
-	}
-
-	private class TimeoutItems implements
-			TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
-		@Override
-		public void expire(Object id, TrackingInfo val) {
-			synchronized (_tracked) {
-				// the combination of the lock and the finished flag ensure that
-				// an id is never timed out if it has been finished
-				val.failed = true;
-				if (!val.finished) {
-					((TimeoutCallback) _delegate).timeoutId(id);
-				}
-			}
-		}
-	}
-
-	private TupleType getTupleType(Tuple tuple) {
-		if (_idStreamSpec != null
-				&& tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
-			return TupleType.ID;
-		} else if (!_sourceArgs.isEmpty()
-				&& tuple.getSourceStreamId().equals(
-						Constants.COORDINATED_STREAM_ID)) {
-			return TupleType.COORD;
-		} else {
-			return TupleType.REGULAR;
-		}
-	}
-
-	static enum TupleType {
-		REGULAR, ID, COORD
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java
deleted file mode 100644
index f8a9386..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.tuple.Tuple;
-import java.io.Serializable;
-import java.util.Map;
-
-public interface IBatchBolt<T> extends Serializable, IComponent {
-	void prepare(Map conf, TopologyContext context,
-			BatchOutputCollector collector, T id);
-
-	void execute(Tuple tuple);
-
-	void finishBatch();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java b/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java
deleted file mode 100644
index 1e61512..0000000
--- a/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.daemon;
-
-public interface Shutdownable {
-	public void shutdown();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java b/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
deleted file mode 100644
index 756b8aa..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package backtype.storm.drpc;
-
-import org.apache.thrift7.TException;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.transport.TFramedTransport;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
-
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-
-public class DRPCInvocationsClient implements DistributedRPCInvocations.Iface {
-	private TTransport conn;
-	private DistributedRPCInvocations.Client client;
-	private String host;
-	private int port;
-
-	public DRPCInvocationsClient(String host, int port) {
-		try {
-			this.host = host;
-			this.port = port;
-			connect();
-		} catch (TException e) {
-			throw new RuntimeException(e);
-		}
-	}
-
-	private void connect() throws TException {
-		conn = new TFramedTransport(new TSocket(host, port));
-		client = new DistributedRPCInvocations.Client(new TBinaryProtocol(conn));
-		conn.open();
-	}
-
-	public String getHost() {
-		return host;
-	}
-
-	public int getPort() {
-		return port;
-	}
-
-	public void result(String id, String result) throws TException {
-		try {
-			if (client == null)
-				connect();
-			client.result(id, result);
-		} catch (TException e) {
-			client = null;
-			throw e;
-		}
-	}
-
-	public DRPCRequest fetchRequest(String func) throws TException {
-		try {
-			if (client == null)
-				connect();
-			return client.fetchRequest(func);
-		} catch (TException e) {
-			client = null;
-			throw e;
-		}
-	}
-
-	public void failRequest(String id) throws TException {
-		try {
-			if (client == null)
-				connect();
-			client.failRequest(id);
-		} catch (TException e) {
-			client = null;
-			throw e;
-		}
-	}
-
-	public void close() {
-		conn.close();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java b/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java
deleted file mode 100644
index a68b008..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package backtype.storm.drpc;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.ILocalDRPC;
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.ServiceRegistry;
-import backtype.storm.utils.Utils;
-
-public class DRPCSpout extends BaseRichSpout {
-	public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
-
-	SpoutOutputCollector _collector;
-	List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
-	String _function;
-	String _local_drpc_id = null;
-
-	private static class DRPCMessageId {
-		String id;
-		int index;
-
-		public DRPCMessageId(String id, int index) {
-			this.id = id;
-			this.index = index;
-		}
-	}
-
-	public DRPCSpout(String function) {
-		_function = function;
-	}
-
-	public DRPCSpout(String function, ILocalDRPC drpc) {
-		_function = function;
-		_local_drpc_id = drpc.getServiceId();
-	}
-
-	@Override
-	public void open(Map conf, TopologyContext context,
-			SpoutOutputCollector collector) {
-		_collector = collector;
-		if (_local_drpc_id == null) {
-			int numTasks = context.getComponentTasks(
-					context.getThisComponentId()).size();
-			int index = context.getThisTaskIndex();
-
-			int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
-			List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
-			if (servers == null || servers.isEmpty()) {
-				throw new RuntimeException(
-						"No DRPC servers configured for topology");
-			}
-			if (numTasks < servers.size()) {
-				for (String s : servers) {
-					_clients.add(new DRPCInvocationsClient(s, port));
-				}
-			} else {
-				int i = index % servers.size();
-				_clients.add(new DRPCInvocationsClient(servers.get(i), port));
-			}
-		}
-
-	}
-
-	@Override
-	public void close() {
-		for (DRPCInvocationsClient client : _clients) {
-			client.close();
-		}
-	}
-
-	@Override
-	public void nextTuple() {
-		boolean gotRequest = false;
-		if (_local_drpc_id == null) {
-			for (int i = 0; i < _clients.size(); i++) {
-				DRPCInvocationsClient client = _clients.get(i);
-				try {
-					DRPCRequest req = client.fetchRequest(_function);
-					if (req.get_request_id().length() > 0) {
-						Map returnInfo = new HashMap();
-						returnInfo.put("id", req.get_request_id());
-						returnInfo.put("host", client.getHost());
-						returnInfo.put("port", client.getPort());
-						gotRequest = true;
-						_collector.emit(new Values(req.get_func_args(),
-								Utils.to_json(returnInfo)),
-								new DRPCMessageId(req.get_request_id(), i));
-						break;
-					}
-				} catch (TException e) {
-					LOG.error("Failed to fetch DRPC result from DRPC server", e);
-				}
-			}
-		} else {
-			DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry
-					.getService(_local_drpc_id);
-			if (drpc != null) { // can happen during shutdown of drpc while
-								// topology is still up
-				try {
-					DRPCRequest req = drpc.fetchRequest(_function);
-					if (req.get_request_id().length() > 0) {
-						Map returnInfo = new HashMap();
-						returnInfo.put("id", req.get_request_id());
-						returnInfo.put("host", _local_drpc_id);
-						returnInfo.put("port", 0);
-						gotRequest = true;
-						_collector.emit(new Values(req.get_func_args(),
-								Utils.to_json(returnInfo)),
-								new DRPCMessageId(req.get_request_id(), 0));
-					}
-				} catch (TException e) {
-					throw new RuntimeException(e);
-				}
-			}
-		}
-		if (!gotRequest) {
-			Utils.sleep(1);
-		}
-	}
-
-	@Override
-	public void ack(Object msgId) {
-	}
-
-	@Override
-	public void fail(Object msgId) {
-		DRPCMessageId did = (DRPCMessageId) msgId;
-		DistributedRPCInvocations.Iface client;
-
-		if (_local_drpc_id == null) {
-			client = _clients.get(did.index);
-		} else {
-			client = (DistributedRPCInvocations.Iface) ServiceRegistry
-					.getService(_local_drpc_id);
-		}
-		try {
-			client.failRequest(did.id);
-		} catch (TException e) {
-			LOG.error("Failed to fail request", e);
-		}
-	}
-
-	@Override
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("args", "return-info"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java b/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java
deleted file mode 100644
index b6733a3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.drpc;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JoinResult extends BaseRichBolt {
-	public static Logger LOG = LoggerFactory.getLogger(JoinResult.class);
-
-	String returnComponent;
-	Map<Object, Tuple> returns = new HashMap<Object, Tuple>();
-	Map<Object, Tuple> results = new HashMap<Object, Tuple>();
-	OutputCollector _collector;
-
-	public JoinResult(String returnComponent) {
-		this.returnComponent = returnComponent;
-	}
-
-	public void prepare(Map map, TopologyContext context,
-			OutputCollector collector) {
-		_collector = collector;
-	}
-
-	public void execute(Tuple tuple) {
-		Object requestId = tuple.getValue(0);
-		if (tuple.getSourceComponent().equals(returnComponent)) {
-			returns.put(requestId, tuple);
-		} else {
-			results.put(requestId, tuple);
-		}
-
-		if (returns.containsKey(requestId) && results.containsKey(requestId)) {
-			Tuple result = results.remove(requestId);
-			Tuple returner = returns.remove(requestId);
-			LOG.debug(result.getValue(1).toString());
-			List<Tuple> anchors = new ArrayList<Tuple>();
-			anchors.add(result);
-			anchors.add(returner);
-			_collector.emit(anchors, new Values("" + result.getValue(1),
-					returner.getValue(1)));
-			_collector.ack(result);
-			_collector.ack(returner);
-		}
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		declarer.declare(new Fields("result", "return-info"));
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java b/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
deleted file mode 100644
index 287168f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package backtype.storm.drpc;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.KeyedRoundRobinQueue;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KeyedFairBolt implements IRichBolt, FinishedCallback {
-	IRichBolt _delegate;
-	KeyedRoundRobinQueue<Tuple> _rrQueue;
-	Thread _executor;
-	FinishedCallback _callback;
-
-	public KeyedFairBolt(IRichBolt delegate) {
-		_delegate = delegate;
-	}
-
-	public KeyedFairBolt(IBasicBolt delegate) {
-		this(new BasicBoltExecutor(delegate));
-	}
-
-	public void prepare(Map stormConf, TopologyContext context,
-			OutputCollector collector) {
-		if (_delegate instanceof FinishedCallback) {
-			_callback = (FinishedCallback) _delegate;
-		}
-		_delegate.prepare(stormConf, context, collector);
-		_rrQueue = new KeyedRoundRobinQueue<Tuple>();
-		_executor = new Thread(new Runnable() {
-			public void run() {
-				try {
-					while (true) {
-						_delegate.execute(_rrQueue.take());
-					}
-				} catch (InterruptedException e) {
-
-				}
-			}
-		});
-		_executor.setDaemon(true);
-		_executor.start();
-	}
-
-	public void execute(Tuple input) {
-		Object key = input.getValue(0);
-		_rrQueue.add(key, input);
-	}
-
-	public void cleanup() {
-		_executor.interrupt();
-		_delegate.cleanup();
-	}
-
-	public void declareOutputFields(OutputFieldsDeclarer declarer) {
-		_delegate.declareOutputFields(declarer);
-	}
-
-	public void finishedId(Object id) {
-		if (_callback != null) {
-			_callback.finishedId(id);
-		}
-	}
-
-	@Override
-	public Map<String, Object> getComponentConfiguration() {
-		return new HashMap<String, Object>();
-	}
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
deleted file mode 100644
index 5277cff..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package backtype.storm.drpc;
-
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.ComponentConfigurationDeclarer;
-import backtype.storm.tuple.Fields;
-
-public interface LinearDRPCInputDeclarer extends
-		ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
-	public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
-
-	public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
-
-	public LinearDRPCInputDeclarer globalGrouping();
-
-	public LinearDRPCInputDeclarer globalGrouping(String streamId);
-
-	public LinearDRPCInputDeclarer shuffleGrouping();
-
-	public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
-
-	public LinearDRPCInputDeclarer localOrShuffleGrouping();
-
-	public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
-
-	public LinearDRPCInputDeclarer noneGrouping();
-
-	public LinearDRPCInputDeclarer noneGrouping(String streamId);
-
-	public LinearDRPCInputDeclarer allGrouping();
-
-	public LinearDRPCInputDeclarer allGrouping(String streamId);
-
-	public LinearDRPCInputDeclarer directGrouping();
-
-	public LinearDRPCInputDeclarer directGrouping(String streamId);
-
-	public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
-
-	public LinearDRPCInputDeclarer customGrouping(String streamId,
-			CustomStreamGrouping grouping);
-
-}