You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:04 UTC

[08/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
deleted file mode 100644
index 1960371..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterState.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-import clojure.lang.APersistentMap;
-import clojure.lang.IFn;
-import java.util.List;
-import org.apache.zookeeper.data.ACL;
-
-/**
- * ClusterState provides the API for the pluggable state store used by the
- * Storm daemons. Data is stored in path/value format, and the store supports
- * listing sub-paths at a given path.
- * All data should be available across all nodes with eventual consistency.
- *
- * IMPORTANT NOTE:
- * Heartbeats have different api calls used to interact with them. The root
- * path (/) may or may not be the same as the root path for the other api calls.
- *
- * For example, performing these two calls:
- *     set_data("/path", data, acls);
- *     void set_worker_hb("/path", heartbeat, acls);
- * may or may not cause a collision in "/path".
- * Never use the same paths with the *_hb* methods as you do with the others.
- */
-public interface ClusterState {
-
-    /**
-     * Registers a callback function that gets called when CuratorEvents happen.
-     * @param callback is a clojure IFn that accepts the type - translated to
-     * clojure keyword as in zookeeper.clj - and the path: (callback type path)
-     * @return is an id that can be passed to unregister(...) to unregister the
-     * callback.
-     */
-    String register(IFn callback);
-
-    /**
-     * Unregisters a callback function that was registered with register(...).
-     * @param id is the String id that was returned from register(...).
-     */
-    void unregister(String id);
-
-    /**
-     * Path will be appended with a monotonically increasing integer, a new node
-     * will be created there, and data will be put at that node.
-     * @param path The path that the monotonically increasing integer suffix will
-     * be added to.
-     * @param data The data that will be written at the suffixed path's node.
-     * @param acls The acls to apply to the path. May be null.
-     * @return The path with the integer suffix appended.
-     */
-    String create_sequential(String path, byte[] data, List<ACL> acls);
-
-    /**
-     * Creates nodes for path and all its parents. Path elements are separated by
-     * a "/", as in *nix filesystem notation. Equivalent to mkdir -p in *nix.
-     * @param path The path to create, along with all its parents.
-     * @param acls The acls to apply to the path. May be null.
-     * @return path
-     */
-    String mkdirs(String path, List<ACL> acls);
-
-    /**
-     * Deletes the node at a given path, and any child nodes that may exist.
-     * @param path The path to delete
-     */
-    void delete_node(String path);
-
-    /**
-     * Creates an ephemeral node at path. Ephemeral nodes are destroyed
-     * by the store when the client disconnects.
-     * @param path The path where a node will be created.
-     * @param data The data to be written at the node.
-     * @param acls The acls to apply to the path. May be null.
-     */
-    void set_ephemeral_node(String path, byte[] data, List<ACL> acls);
-
-    /**
-     * Gets the 'version' of the node at a path. Optionally sets a watch
-     * on that node. The version should increase whenever a write happens.
-     * @param path The path to get the version of.
-     * @param watch Whether or not to set a watch on the path. Watched paths
-     * emit events which are consumed by functions registered with the
-     * register method. Very useful for catching updates to nodes.
-     * @return The integer version of this node.
-     */
-    Integer get_version(String path, boolean watch);
-
-    /**
-     * Check if a node exists and optionally set a watch on the path.
-     * @param path The path to check for the existence of a node.
-     * @param watch Whether or not to set a watch on the path. Watched paths
-     * emit events which are consumed by functions registered with the
-     * register method. Very useful for catching updates to nodes.
-     * @return Whether or not a node exists at path.
-     */
-    boolean node_exists(String path, boolean watch);
-
-    /**
-     * Get a list of paths of all the child nodes which exist immediately
-     * under path.
-     * @param path The path to look under
-     * @param watch Whether or not to set a watch on the path. Watched paths
-     * emit events which are consumed by functions registered with the
-     * register method. Very useful for catching updates to nodes.
-     * @return list of string paths under path.
-     */
-    List<String> get_children(String path, boolean watch);
-
-    /**
-     * Close the connection to the data store.
-     */
-    void close();
-
-    /**
-     * Set the value of the node at path to data.
-     * @param path The path whose node we want to set.
-     * @param data The data to put in the node.
-     * @param acls The acls to apply to the path. May be null.
-     */
-    void set_data(String path, byte[] data, List<ACL> acls);
-
-    /**
-     * Get the data from the node at path
-     * @param path The path to look under
-     * @param watch Whether or not to set a watch on the path. Watched paths
-     * emit events which are consumed by functions registered with the
-     * register method. Very useful for catching updates to nodes.
-     * @return The data at the node.
-     */
-    byte[] get_data(String path, boolean watch);
-
-    /**
-     * Get the data at the node along with its version. Data is returned
-     * in an APersistentMap with clojure keyword keys :data and :version.
-     * @param path The path to look under
-     * @param watch Whether or not to set a watch on the path. Watched paths
-     * emit events which are consumed by functions registered with the
-     * register method. Very useful for catching updates to nodes.
-     * @return An APersistentMap in the form {:data data :version version}
-     */
-    APersistentMap get_data_with_version(String path, boolean watch);
-
-    /**
-     * Write a worker heartbeat at the path.
-     * @param path The path whose node we want to set.
-     * @param data The data to put in the node.
-     * @param acls The acls to apply to the path. May be null.
-     */
-    void set_worker_hb(String path, byte[] data, List<ACL> acls);
-
-    /**
-     * Get the heartbeat from the node at path
-     * @param path The path to look under
-     * @param watch Whether or not to set a watch on the path. Watched paths
-     * emit events which are consumed by functions registered with the
-     * register method. Very useful for catching updates to nodes.
-     * @return The heartbeat at the node.
-     */
-    byte[] get_worker_hb(String path, boolean watch);
-
-    /**
-     * Get a list of paths of all the child nodes which exist immediately
-     * under path. This is similar to get_children, but must be used for
-     * any nodes
-     * @param path The path to look under
-     * @param watch Whether or not to set a watch on the path. Watched paths
-     * emit events which are consumed by functions registered with the
-     * register method. Very useful for catching updates to nodes.
-     * @return list of string paths under path.
-     */
-    List<String> get_worker_hb_children(String path, boolean watch);
-
-    /**
-     * Deletes the heartbeat at a given path, and any child nodes that may exist.
-     * @param path The path to delete.
-     */
-    void delete_worker_hb(String path);
-
-    /**
-     * Add a ClusterStateListener to the connection.
-     * @param listener A ClusterStateListener to handle changing cluster state
-     * events.
-     */
-    void add_listener(ClusterStateListener listener);
-
-    /**
-     * Force consistency on a path. Any writes committed on the path before
-     * this call will be completely propagated when it returns.
-     * @param path The path to synchronize.
-     */
-    void sync_path(String path);
-
-    /**
-     * Allows us to delete the znodes within /storm/blobstore/key_name
-     * whose znodes start with the corresponding nimbusHostPortInfo
-     * @param path /storm/blobstore/key_name
-     * @param nimbusHostPortInfo Contains the host port information of
-     * a nimbus node.
-     */
-    void delete_node_blobstore(String path, String nimbusHostPortInfo);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java
deleted file mode 100644
index 5ccde23..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateContext.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.cluster;
-
-/**
- * This class is intended to provide runtime-context to ClusterStateFactory
- * implementors, giving information such as what daemon is creating it.
- */
-public class ClusterStateContext {
-    
-    private DaemonType daemonType;
-
-    public ClusterStateContext() {
-        daemonType = DaemonType.UNKNOWN;
-    }
-    
-    public ClusterStateContext(DaemonType daemonType) {
-        this.daemonType = daemonType;
-    }
-    
-    public DaemonType getDaemonType() {
-        return daemonType;
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java
deleted file mode 100644
index 1f946ee..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-import clojure.lang.APersistentMap;
-import java.util.List;
-import org.apache.zookeeper.data.ACL;
-
-public interface ClusterStateFactory {
-    
-    ClusterState mkState(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context);
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java b/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java
deleted file mode 100644
index 22693f8..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ClusterStateListener.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-public interface ClusterStateListener {
-    void stateChanged(ConnectionState newState);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java b/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java
deleted file mode 100644
index d6887da..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/ConnectionState.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.cluster;
-
-public enum ConnectionState {
-    CONNECTED,
-    RECONNECTED,
-    LOST
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java b/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java
deleted file mode 100644
index 684d0ef..0000000
--- a/storm-core/src/jvm/backtype/storm/cluster/DaemonType.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.cluster;
-
-public enum DaemonType {
-    SUPERVISOR,
-    NIMBUS,
-    WORKER,
-    PACEMAKER,
-    UNKNOWN
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
deleted file mode 100644
index 55590d0..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
-    public static final Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
-
-    byte[] _boltSer;
-    Map<Object, IBatchBolt> _openTransactions;
-    Map _conf;
-    TopologyContext _context;
-    BatchOutputCollectorImpl _collector;
-    
-    public BatchBoltExecutor(IBatchBolt bolt) {
-        _boltSer = Utils.javaSerialize(bolt);
-    }
-    
-    @Override
-    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
-        _conf = conf;
-        _context = context;
-        _collector = new BatchOutputCollectorImpl(collector);
-        _openTransactions = new HashMap<>();
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        Object id = input.getValue(0);
-        IBatchBolt bolt = getBatchBolt(id);
-        try {
-             bolt.execute(input);
-            _collector.ack(input);
-        } catch(FailedException e) {
-            LOG.error("Failed to process tuple in batch", e);
-            _collector.fail(input);                
-        }
-    }
-
-    @Override
-    public void cleanup() {
-    }
-
-    @Override
-    public void finishedId(Object id) {
-        IBatchBolt bolt = getBatchBolt(id);
-        _openTransactions.remove(id);
-        bolt.finishBatch();
-    }
-
-    @Override
-    public void timeoutId(Object attempt) {
-        _openTransactions.remove(attempt);        
-    }    
-    
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        newTransactionalBolt().declareOutputFields(declarer);
-    }
-    
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return newTransactionalBolt().getComponentConfiguration();
-    }
-    
-    private IBatchBolt getBatchBolt(Object id) {
-        IBatchBolt bolt = _openTransactions.get(id);
-        if(bolt==null) {
-            bolt = newTransactionalBolt();
-            bolt.prepare(_conf, _context, _collector, id);
-            _openTransactions.put(id, bolt);            
-        }
-        return bolt;
-    }
-    
-    private IBatchBolt newTransactionalBolt() {
-        return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java b/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java
deleted file mode 100644
index f5f3457..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollector.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-public abstract class BatchOutputCollector {
-
-    /**
-     * Emits a tuple to the default output stream.
-     */
-    public List<Integer> emit(List<Object> tuple) {
-        return emit(Utils.DEFAULT_STREAM_ID, tuple);
-    }
-
-    public abstract List<Integer> emit(String streamId, List<Object> tuple);
-    
-    /**
-     * Emits a tuple to the specified task on the default output stream. This output
-     * stream must have been declared as a direct stream, and the specified task must
-     * use a direct grouping on this stream to receive the message.
-     */
-    public void emitDirect(int taskId, List<Object> tuple) {
-        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
-    }
-    
-    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); 
-    
-    public abstract void reportError(Throwable error);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java b/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java
deleted file mode 100644
index cae7560..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchOutputCollectorImpl.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
-import java.util.List;
-
-public class BatchOutputCollectorImpl extends BatchOutputCollector {
-    OutputCollector _collector;
-    
-    public BatchOutputCollectorImpl(OutputCollector collector) {
-        _collector = collector;
-    }
-    
-    @Override
-    public List<Integer> emit(String streamId, List<Object> tuple) {
-        return _collector.emit(streamId, tuple);
-    }
-
-    @Override
-    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
-        _collector.emitDirect(taskId, streamId, tuple);
-    }
-
-    @Override
-    public void reportError(Throwable error) {
-        _collector.reportError(error);
-    }
-    
-    public void ack(Tuple tup) {
-        _collector.ack(tup);
-    }
-    
-    public void fail(Tuple tup) {
-        _collector.fail(tup);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java b/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
deleted file mode 100644
index 1dd1c9f..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchSubtopologyBuilder.java
+++ /dev/null
@@ -1,447 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.Constants;
-import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.grouping.PartialKeyGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-public class BatchSubtopologyBuilder {
-    Map<String, Component> _bolts = new HashMap<String, Component>();
-    Component _masterBolt;
-    String _masterId;
-    
-    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
-        Integer p = boltParallelism == null ? null : boltParallelism.intValue();
-        _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
-        _masterId = masterBoltId;
-    }
-    
-    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
-        this(masterBoltId, masterBolt, null);
-    }
-    
-    public BoltDeclarer getMasterDeclarer() {
-        return new BoltDeclarerImpl(_masterBolt);
-    }
-        
-    public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
-        return setBolt(id, bolt, null);
-    }
-    
-    public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
-        return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
-    }     
-    
-    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
-        return setBolt(id, bolt, null);
-    }    
-    
-    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
-        return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
-    }
-    
-    private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
-        Integer p = null;
-        if(parallelism!=null) p = parallelism.intValue();
-        Component component = new Component(bolt, p);
-        _bolts.put(id, component);
-        return new BoltDeclarerImpl(component);
-    }
-    
-    public void extendTopology(TopologyBuilder builder) {
-        BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
-        for(InputDeclaration decl: _masterBolt.declarations) {
-            decl.declare(declarer);
-        }
-        for(Map conf: _masterBolt.componentConfs) {
-            declarer.addConfigurations(conf);
-        }
-        for(String id: _bolts.keySet()) {
-            Component component = _bolts.get(id);
-            Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
-            for(String c: componentBoltSubscriptions(component)) {
-                SourceArgs source;
-                if(c.equals(_masterId)) {
-                    source = SourceArgs.single();
-                } else {
-                    source = SourceArgs.all();
-                }
-                coordinatedArgs.put(c, source);                    
-            }
-            
-
-            BoltDeclarer input = builder.setBolt(id,
-                                                  new CoordinatedBolt(component.bolt,
-                                                                      coordinatedArgs,
-                                                                      null),
-                                                  component.parallelism);
-            for(Map conf: component.componentConfs) {
-                input.addConfigurations(conf);
-            }
-            for(String c: componentBoltSubscriptions(component)) {
-                input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
-            }
-            for(InputDeclaration d: component.declarations) {
-                d.declare(input);
-            }
-        }        
-    }
-        
-    private Set<String> componentBoltSubscriptions(Component component) {
-        Set<String> ret = new HashSet<String>();
-        for(InputDeclaration d: component.declarations) {
-            ret.add(d.getComponent());
-        }
-        return ret;
-    }
-
-    private static class Component {
-        public IRichBolt bolt;
-        public Integer parallelism;
-        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
-        public List<Map<String, Object>> componentConfs = new ArrayList<>();
-        
-        public Component(IRichBolt bolt, Integer parallelism) {
-            this.bolt = bolt;
-            this.parallelism = parallelism;
-        }
-    }
-    
-    private static interface InputDeclaration {
-        void declare(InputDeclarer declarer);
-        String getComponent();
-    }
-        
-    private static class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
-        Component _component;
-        
-        public BoltDeclarerImpl(Component component) {
-            _component = component;
-        }
-        
-        @Override
-        public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.fieldsGrouping(component, fields);
-                }
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.fieldsGrouping(component, streamId, fields);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer globalGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.globalGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer globalGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.globalGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer shuffleGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.shuffleGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.shuffleGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer localOrShuffleGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localOrShuffleGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.localOrShuffleGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-        
-        @Override
-        public BoltDeclarer noneGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.noneGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer noneGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.noneGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer allGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.allGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer allGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.allGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer directGrouping(final String component) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.directGrouping(component);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer directGrouping(final String component, final String streamId) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.directGrouping(component, streamId);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
-            return customGrouping(componentId, new PartialKeyGrouping(fields));
-        }
-
-        @Override
-        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
-            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
-        }
-        
-        @Override
-        public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.customGrouping(component, grouping);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;        
-        }
-
-        @Override
-        public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.customGrouping(component, streamId, grouping);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return component;
-                }                
-            });
-            return this;
-        }
-
-        @Override
-        public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
-            addDeclaration(new InputDeclaration() {
-                @Override
-                public void declare(InputDeclarer declarer) {
-                    declarer.grouping(stream, grouping);
-                }                
-
-                @Override
-                public String getComponent() {
-                    return stream.get_componentId();
-                }                
-            });
-            return this;
-        }
-        
-        private void addDeclaration(InputDeclaration declaration) {
-            _component.declarations.add(declaration);
-        }
-
-        @Override
-        public BoltDeclarer addConfigurations(Map<String, Object> conf) {
-            _component.componentConfs.add(conf);
-            return this;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java b/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java
deleted file mode 100644
index c3a428c..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/CoordinatedBolt.java
+++ /dev/null
@@ -1,382 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.topology.FailedException;
-import java.util.Map.Entry;
-import backtype.storm.tuple.Values;
-import backtype.storm.generated.GlobalStreamId;
-import java.util.Collection;
-import backtype.storm.Constants;
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-import backtype.storm.utils.Utils;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.get;
-
-/**
- * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
- * in the case of retries.
- */
-public class CoordinatedBolt implements IRichBolt {
-    public static final Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
-
-    public static interface FinishedCallback {
-        void finishedId(Object id);
-    }
-
-    public static interface TimeoutCallback {
-        void timeoutId(Object id);
-    }
-    
-    
-    public static class SourceArgs implements Serializable {
-        public boolean singleCount;
-
-        protected SourceArgs(boolean singleCount) {
-            this.singleCount = singleCount;
-        }
-
-        public static SourceArgs single() {
-            return new SourceArgs(true);
-        }
-
-        public static SourceArgs all() {
-            return new SourceArgs(false);
-        }
-        
-        @Override
-        public String toString() {
-            return "<Single: " + singleCount + ">";
-        }
-    }
-
-    public class CoordinatedOutputCollector implements IOutputCollector {
-        IOutputCollector _delegate;
-
-        public CoordinatedOutputCollector(IOutputCollector delegate) {
-            _delegate = delegate;
-        }
-
-        public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
-            List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
-            updateTaskCounts(tuple.get(0), tasks);
-            return tasks;
-        }
-
-        public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
-            updateTaskCounts(tuple.get(0), Arrays.asList(task));
-            _delegate.emitDirect(task, stream, anchors, tuple);
-        }
-
-        public void ack(Tuple tuple) {
-            Object id = tuple.getValue(0);
-            synchronized(_tracked) {
-                TrackingInfo track = _tracked.get(id);
-                if (track != null)
-                    track.receivedTuples++;
-            }
-            boolean failed = checkFinishId(tuple, TupleType.REGULAR);
-            if(failed) {
-                _delegate.fail(tuple);                
-            } else {
-                _delegate.ack(tuple);
-            }
-        }
-
-        public void fail(Tuple tuple) {
-            Object id = tuple.getValue(0);
-            synchronized(_tracked) {
-                TrackingInfo track = _tracked.get(id);
-                if (track != null)
-                    track.failed = true;
-            }
-            checkFinishId(tuple, TupleType.REGULAR);
-            _delegate.fail(tuple);
-        }
-        
-        public void reportError(Throwable error) {
-            _delegate.reportError(error);
-        }
-
-
-        private void updateTaskCounts(Object id, List<Integer> tasks) {
-            synchronized(_tracked) {
-                TrackingInfo track = _tracked.get(id);
-                if (track != null) {
-                    Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
-                    for(Integer task: tasks) {
-                        int newCount = get(taskEmittedTuples, task, 0) + 1;
-                        taskEmittedTuples.put(task, newCount);
-                    }
-                }
-            }
-        }
-    }
-
-    private Map<String, SourceArgs> _sourceArgs;
-    private IdStreamSpec _idStreamSpec;
-    private IRichBolt _delegate;
-    private Integer _numSourceReports;
-    private List<Integer> _countOutTasks = new ArrayList<>();
-    private OutputCollector _collector;
-    private TimeCacheMap<Object, TrackingInfo> _tracked;
-
-    public static class TrackingInfo {
-        int reportCount = 0;
-        int expectedTupleCount = 0;
-        int receivedTuples = 0;
-        boolean failed = false;
-        Map<Integer, Integer> taskEmittedTuples = new HashMap<>();
-        boolean receivedId = false;
-        boolean finished = false;
-        List<Tuple> ackTuples = new ArrayList<>();
-        
-        @Override
-        public String toString() {
-            return "reportCount: " + reportCount + "\n" +
-                   "expectedTupleCount: " + expectedTupleCount + "\n" +
-                   "receivedTuples: " + receivedTuples + "\n" +
-                   "failed: " + failed + "\n" +
-                   taskEmittedTuples.toString();
-        }
-    }
-
-    
-    public static class IdStreamSpec implements Serializable {
-        GlobalStreamId _id;
-        
-        public GlobalStreamId getGlobalStreamId() {
-            return _id;
-        }
-
-        public static IdStreamSpec makeDetectSpec(String component, String stream) {
-            return new IdStreamSpec(component, stream);
-        }        
-        
-        protected IdStreamSpec(String component, String stream) {
-            _id = new GlobalStreamId(component, stream);
-        }
-    }
-    
-    public CoordinatedBolt(IRichBolt delegate) {
-        this(delegate, null, null);
-    }
-
-    public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
-        this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
-    }
-    
-    public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
-        _sourceArgs = sourceArgs;
-        if(_sourceArgs==null) _sourceArgs = new HashMap<>();
-        _delegate = delegate;
-        _idStreamSpec = idStreamSpec;
-    }
-    
-    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
-        TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
-        if(_delegate instanceof TimeoutCallback) {
-            callback = new TimeoutItems();
-        }
-        _tracked = new TimeCacheMap<>(context.maxTopologyMessageTimeout(), callback);
-        _collector = collector;
-        _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
-        for(String component: Utils.get(context.getThisTargets(),
-                                        Constants.COORDINATED_STREAM_ID,
-                                        new HashMap<String, Grouping>())
-                                        .keySet()) {
-            for(Integer task: context.getComponentTasks(component)) {
-                _countOutTasks.add(task);
-            }
-        }
-        if(!_sourceArgs.isEmpty()) {
-            _numSourceReports = 0;
-            for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
-                if(entry.getValue().singleCount) {
-                    _numSourceReports+=1;
-                } else {
-                    _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
-                }
-            }
-        }
-    }
-
-    private boolean checkFinishId(Tuple tup, TupleType type) {
-        Object id = tup.getValue(0);
-        boolean failed = false;
-        
-        synchronized(_tracked) {
-            TrackingInfo track = _tracked.get(id);
-            try {
-                if(track!=null) {
-                    boolean delayed = false;
-                    if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
-                        track.ackTuples.add(tup);
-                        delayed = true;
-                    }
-                    if(track.failed) {
-                        failed = true;
-                        for(Tuple t: track.ackTuples) {
-                            _collector.fail(t);
-                        }
-                        _tracked.remove(id);
-                    } else if(track.receivedId
-                             && (_sourceArgs.isEmpty() ||
-                                  track.reportCount==_numSourceReports &&
-                                  track.expectedTupleCount == track.receivedTuples)){
-                        if(_delegate instanceof FinishedCallback) {
-                            ((FinishedCallback)_delegate).finishedId(id);
-                        }
-                        if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
-                            throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
-                        }
-                        Iterator<Integer> outTasks = _countOutTasks.iterator();
-                        while(outTasks.hasNext()) {
-                            int task = outTasks.next();
-                            int numTuples = get(track.taskEmittedTuples, task, 0);
-                            _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
-                        }
-                        for(Tuple t: track.ackTuples) {
-                            _collector.ack(t);
-                        }
-                        track.finished = true;
-                        _tracked.remove(id);
-                    }
-                    if(!delayed && type!=TupleType.REGULAR) {
-                        if(track.failed) {
-                            _collector.fail(tup);
-                        } else {
-                            _collector.ack(tup);                            
-                        }
-                    }
-                } else {
-                    if(type!=TupleType.REGULAR) _collector.fail(tup);
-                }
-            } catch(FailedException e) {
-                LOG.error("Failed to finish batch", e);
-                for(Tuple t: track.ackTuples) {
-                    _collector.fail(t);
-                }
-                _tracked.remove(id);
-                failed = true;
-            }
-        }
-        return failed;
-    }
-
-    public void execute(Tuple tuple) {
-        Object id = tuple.getValue(0);
-        TrackingInfo track;
-        TupleType type = getTupleType(tuple);
-        synchronized(_tracked) {
-            track = _tracked.get(id);
-            if(track==null) {
-                track = new TrackingInfo();
-                if(_idStreamSpec==null) track.receivedId = true;
-                _tracked.put(id, track);
-            }
-        }
-        
-        if(type==TupleType.ID) {
-            synchronized(_tracked) {
-                track.receivedId = true;
-            }
-            checkFinishId(tuple, type);            
-        } else if(type==TupleType.COORD) {
-            int count = (Integer) tuple.getValue(1);
-            synchronized(_tracked) {
-                track.reportCount++;
-                track.expectedTupleCount+=count;
-            }
-            checkFinishId(tuple, type);
-        } else {            
-            synchronized(_tracked) {
-                _delegate.execute(tuple);
-            }
-        }
-    }
-
-    public void cleanup() {
-        _delegate.cleanup();
-        _tracked.cleanup();
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        _delegate.declareOutputFields(declarer);
-        declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return _delegate.getComponentConfiguration();
-    }
-    
-    private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
-        Map<String, SourceArgs> ret = new HashMap<>();
-        ret.put(sourceComponent, sourceArgs);
-        return ret;
-    }
-    
-    private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
-        @Override
-        public void expire(Object id, TrackingInfo val) {
-            synchronized(_tracked) {
-                // the combination of the lock and the finished flag ensure that
-                // an id is never timed out if it has been finished
-                val.failed = true;
-                if(!val.finished) {
-                    ((TimeoutCallback) _delegate).timeoutId(id);
-                }
-            }
-        }
-    }
-    
-    private TupleType getTupleType(Tuple tuple) {
-        if(_idStreamSpec!=null
-                && tuple.getSourceGlobalStreamId().equals(_idStreamSpec._id)) {
-            return TupleType.ID;
-        } else if(!_sourceArgs.isEmpty()
-                && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
-            return TupleType.COORD;
-        } else {
-            return TupleType.REGULAR;
-        }
-    }
-    
-    static enum TupleType {
-        REGULAR,
-        ID,
-        COORD
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java b/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java
deleted file mode 100644
index ee5d9bd..0000000
--- a/storm-core/src/jvm/backtype/storm/coordination/IBatchBolt.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.coordination;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.tuple.Tuple;
-import java.io.Serializable;
-import java.util.Map;
-
-public interface IBatchBolt<T> extends Serializable, IComponent {
-    void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
-    void execute(Tuple tuple);
-    void finishBatch();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java b/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java
deleted file mode 100644
index 3a0dfbb..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/ClientJarTransformerRunner.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.daemon;
-
-import backtype.storm.utils.Utils;
-
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.OutputStream;
-import java.io.InputStream;
-
-/**
- * Main executable to load and run a jar transformer
- */
-public class ClientJarTransformerRunner {
-    public static void main(String [] args) throws IOException {
-        JarTransformer transformer = Utils.jarTransformer(args[0]);
-        InputStream in = new FileInputStream(args[1]);
-        OutputStream out = new FileOutputStream(args[2]);
-        transformer.transform(in, out);
-        in.close();
-        out.close();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java b/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java
deleted file mode 100644
index 67b6527..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/DirectoryCleaner.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.daemon;
-
-import java.io.IOException;
-import java.io.File;
-import java.io.FileInputStream;
-import java.nio.file.FileSystems;
-import java.nio.file.Files;
-import java.nio.file.Paths;
-import java.nio.file.Path;
-import java.nio.file.DirectoryStream;
-import java.util.Stack;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Set;
-import java.util.Comparator;
-import java.util.PriorityQueue;
-import java.util.regex.Pattern;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Provide methods to help Logviewer to clean up
- * files in directories and to get a list of files without
- * worrying about excessive memory usage.
- *
- */
-public class DirectoryCleaner {
-    private static final Logger LOG = LoggerFactory.getLogger(DirectoryCleaner.class);
-    // used to recognize the pattern of active log files, we may remove the "current" from this list
-    private static final Pattern ACTIVE_LOG_PATTERN = Pattern.compile(".*\\.(log|err|out|current|yaml|pid)$");
-    // used to recognize the pattern of some meta files in a worker log directory
-    private static final Pattern META_LOG_PATTERN= Pattern.compile(".*\\.(yaml|pid)$");
-
-    // not defining this as static is to allow for mocking in tests
-    public DirectoryStream<Path> getStreamForDirectory(File dir) throws IOException {
-        DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath());
-        return stream;
-    }
-
-    /**
-     * If totalSize of files exceeds the either the per-worker quota or global quota,
-     * Logviewer deletes oldest inactive log files in a worker directory or in all worker dirs.
-     * We use the parameter for_per_dir to switch between the two deletion modes.
-     * @param dirs the list of directories to be scanned for deletion
-     * @param quota the per-dir quota or the total quota for the all directories
-     * @param for_per_dir if true, deletion happens for a single dir; otherwise, for all directories globally
-     * @param active_dirs only for global deletion, we want to skip the active logs in active_dirs
-     * @return number of files deleted
-     */
-    public int deleteOldestWhileTooLarge(List<File> dirs,
-                        long quota, boolean for_per_dir, Set<String> active_dirs) throws IOException {
-        final int PQ_SIZE = 1024; // max number of files to delete for every round
-        final int MAX_ROUNDS  = 512; // max rounds of scanning the dirs
-        long totalSize = 0;
-        int deletedFiles = 0;
-
-        for (File dir : dirs) {
-            try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
-                for (Path path : stream) {
-                    File file = path.toFile();
-                    totalSize += file.length();
-                }
-            }
-        }
-        long toDeleteSize = totalSize - quota;
-        if (toDeleteSize <= 0) {
-            return deletedFiles;
-        }
-
-        Comparator<File> comparator = new Comparator<File>() {
-            public int compare(File f1, File f2) {
-                if (f1.lastModified() > f2.lastModified()) {
-                    return -1;
-                } else {
-                    return 1;
-                }
-            }
-        };
-        // the oldest pq_size files in this directory will be placed in PQ, with the newest at the root
-        PriorityQueue<File> pq = new PriorityQueue<File>(PQ_SIZE, comparator);
-        int round = 0;
-        while (toDeleteSize > 0) {
-            LOG.debug("To delete size is {}, start a new round of deletion, round: {}", toDeleteSize, round);
-            for (File dir : dirs) {
-                try (DirectoryStream<Path> stream = getStreamForDirectory(dir)) {
-                    for (Path path : stream) {
-                        File file = path.toFile();
-                        if (for_per_dir) {
-                            if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) {
-                                continue; // skip active log files
-                            }
-                        } else { // for global cleanup
-                            if (active_dirs.contains(dir.getCanonicalPath())) { // for an active worker's dir, make sure for the last "/"
-                                if (ACTIVE_LOG_PATTERN.matcher(file.getName()).matches()) {
-                                    continue; // skip active log files
-                                }
-                            } else {
-                                if (META_LOG_PATTERN.matcher(file.getName()).matches()) {
-                                    continue; // skip yaml and pid files
-                                }
-                            }
-                        }
-                        if (pq.size() < PQ_SIZE) {
-                            pq.offer(file);
-                        } else {
-                            if (file.lastModified() < pq.peek().lastModified()) {
-                                pq.poll();
-                                pq.offer(file);
-                            }
-                        }
-                    }
-                }
-            }
-            // need to reverse the order of elements in PQ to delete files from oldest to newest
-            Stack<File> stack = new Stack<File>();
-            while (!pq.isEmpty()) {
-                File file = pq.poll();
-                stack.push(file);
-            }
-            while (!stack.isEmpty() && toDeleteSize > 0) {
-                File file = stack.pop();
-                toDeleteSize -= file.length();
-                LOG.info("Delete file: {}, size: {}, lastModified: {}", file.getName(), file.length(), file.lastModified());
-                file.delete();
-                deletedFiles++;
-            }
-            pq.clear();
-            round++;
-            if (round >= MAX_ROUNDS) {
-                if (for_per_dir) {
-                    LOG.warn("Reach the MAX_ROUNDS: {} during per-dir deletion, you may have too many files in " +
-                            "a single directory : {}, will delete the rest files in next interval.",
-                            MAX_ROUNDS, dirs.get(0).getCanonicalPath());
-                } else {
-                    LOG.warn("Reach the MAX_ROUNDS: {} during global deletion, you may have too many files, " +
-                            "will delete the rest files in next interval.", MAX_ROUNDS);
-                }
-                break;
-            }
-        }
-        return deletedFiles;
-    }
-
-    // Note that to avoid memory problem, we only return the first 1024 files in a directory
-    public static List<File> getFilesForDir(File dir) throws IOException {
-        List<File> files = new ArrayList<File>();
-        final int MAX_NUM = 1024;
-
-        try (DirectoryStream<Path> stream = Files.newDirectoryStream(dir.toPath())) {
-            for (Path path : stream) {
-                files.add(path.toFile());
-                if (files.size() >= MAX_NUM) {
-                    break;
-                }
-            }
-        }
-        return files;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java b/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java
deleted file mode 100644
index 914710a..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/JarTransformer.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package backtype.storm.daemon;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * A plugin that can be used to transform a jar file in nimbus before it
- * is used by a topology.
- */
-public interface JarTransformer {
-    public void transform(InputStream input, OutputStream output) throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java b/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java
deleted file mode 100644
index b1d8ddf..0000000
--- a/storm-core/src/jvm/backtype/storm/daemon/Shutdownable.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.daemon;
-
-public interface Shutdownable {
-    public void shutdown();
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
deleted file mode 100644
index 78e8d9b..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCInvocationsClient.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.drpc;
-
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
-
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.security.auth.ThriftClient;
-import backtype.storm.security.auth.ThriftConnectionType;
-import org.apache.thrift.transport.TTransportException;
-import org.apache.thrift.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
-    public static final Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
-    private final AtomicReference<DistributedRPCInvocations.Client> client = new AtomicReference<>();
-    private String host;
-    private int port;
-
-    public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
-        super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
-        this.host = host;
-        this.port = port;
-        client.set(new DistributedRPCInvocations.Client(_protocol));
-    }
-        
-    public String getHost() {
-        return host;
-    }
-    
-    public int getPort() {
-        return port;
-    }       
-
-    public void reconnectClient() throws TException {
-        if (client.get() == null) {
-            reconnect();
-            client.set(new DistributedRPCInvocations.Client(_protocol));
-        }
-    }
-
-    public boolean isConnected() {
-        return client.get() != null;
-    }
-
-    public void result(String id, String result) throws TException, AuthorizationException {
-        DistributedRPCInvocations.Client c = client.get();
-        try {
-            if (c == null) {
-                throw new TException("Client is not connected...");
-            }
-            c.result(id, result);
-        } catch(AuthorizationException aze) {
-            throw aze;
-        } catch(TException e) {
-            client.compareAndSet(c, null);
-            throw e;
-        }
-    }
-
-    public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
-        DistributedRPCInvocations.Client c = client.get();
-        try {
-            if (c == null) {
-                throw new TException("Client is not connected...");
-            }
-            return c.fetchRequest(func);
-        } catch(AuthorizationException aze) {
-            throw aze;
-        } catch(TException e) {
-            client.compareAndSet(c, null);
-            throw e;
-        }
-    }    
-
-    public void failRequest(String id) throws TException, AuthorizationException {
-        DistributedRPCInvocations.Client c = client.get();
-        try {
-            if (c == null) {
-                throw new TException("Client is not connected...");
-            }
-            c.failRequest(id);
-        } catch(AuthorizationException aze) {
-            throw aze;
-        } catch(TException e) {
-            client.compareAndSet(c, null);
-            throw e;
-        }
-    }
-
-    public DistributedRPCInvocations.Client getClient() {
-        return client.get();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java b/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
deleted file mode 100644
index 4ed15c0..0000000
--- a/storm-core/src/jvm/backtype/storm/drpc/DRPCSpout.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package backtype.storm.drpc;
-
-import backtype.storm.Config;
-import backtype.storm.ILocalDRPC;
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.generated.AuthorizationException;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.ExtendedThreadPoolExecutor;
-import backtype.storm.utils.ServiceRegistry;
-import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.Callable;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.thrift.TException;
-import org.json.simple.JSONValue;
-
-public class DRPCSpout extends BaseRichSpout {
-    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
-    static final long serialVersionUID = 2387848310969237877L;
-
-    public static final Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
-    
-    SpoutOutputCollector _collector;
-    List<DRPCInvocationsClient> _clients = new ArrayList<>();
-    transient LinkedList<Future<Void>> _futures = null;
-    transient ExecutorService _backround = null;
-    String _function;
-    String _local_drpc_id = null;
-    
-    private static class DRPCMessageId {
-        String id;
-        int index;
-        
-        public DRPCMessageId(String id, int index) {
-            this.id = id;
-            this.index = index;
-        }
-    }
-    
-    
-    public DRPCSpout(String function) {
-        _function = function;
-    }
-
-    public DRPCSpout(String function, ILocalDRPC drpc) {
-        _function = function;
-        _local_drpc_id = drpc.getServiceId();
-    }
-
-    public String get_function() {
-        return _function;
-    }
-
-    private class Adder implements Callable<Void> {
-        private String server;
-        private int port;
-        private Map conf;
-
-        public Adder(String server, int port, Map conf) {
-            this.server = server;
-            this.port = port;
-            this.conf = conf;
-        }
-
-        @Override
-        public Void call() throws Exception {
-            DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
-            synchronized (_clients) {
-                _clients.add(c);
-            }
-            return null;
-        }
-    }
-
-    private void reconnect(final DRPCInvocationsClient c) {
-        _futures.add(_backround.submit(new Callable<Void>() {
-            @Override
-            public Void call() throws Exception {
-                c.reconnectClient();
-                return null;
-            }
-        }));
-    }
-
-    private void checkFutures() {
-        Iterator<Future<Void>> i = _futures.iterator();
-        while (i.hasNext()) {
-            Future<Void> f = i.next();
-            if (f.isDone()) {
-                i.remove();
-            }
-            try {
-                f.get();
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
- 
-    @Override
-    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
-        _collector = collector;
-        if(_local_drpc_id==null) {
-            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
-                60L, TimeUnit.SECONDS,
-                new SynchronousQueue<Runnable>());
-            _futures = new LinkedList<>();
-
-            int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
-            int index = context.getThisTaskIndex();
-
-            int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
-            List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
-            if(servers == null || servers.isEmpty()) {
-                throw new RuntimeException("No DRPC servers configured for topology");   
-            }
-            
-            if (numTasks < servers.size()) {
-                for (String s: servers) {
-                    _futures.add(_backround.submit(new Adder(s, port, conf)));
-                }
-            } else {        
-                int i = index % servers.size();
-                _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
-            }
-        }
-        
-    }
-
-    @Override
-    public void close() {
-        for(DRPCInvocationsClient client: _clients) {
-            client.close();
-        }
-    }
-
-    @Override
-    public void nextTuple() {
-        boolean gotRequest = false;
-        if(_local_drpc_id==null) {
-            int size;
-            synchronized (_clients) {
-                size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
-            }
-            for(int i=0; i<size; i++) {
-                DRPCInvocationsClient client;
-                synchronized (_clients) {
-                    client = _clients.get(i);
-                }
-                if (!client.isConnected()) {
-                    continue;
-                }
-                try {
-                    DRPCRequest req = client.fetchRequest(_function);
-                    if(req.get_request_id().length() > 0) {
-                        Map returnInfo = new HashMap();
-                        returnInfo.put("id", req.get_request_id());
-                        returnInfo.put("host", client.getHost());
-                        returnInfo.put("port", client.getPort());
-                        gotRequest = true;
-                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
-                        break;
-                    }
-                } catch (AuthorizationException aze) {
-                    reconnect(client);
-                    LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
-                } catch (TException e) {
-                    reconnect(client);
-                    LOG.error("Failed to fetch DRPC result from DRPC server", e);
-                } catch (Exception e) {
-                    LOG.error("Failed to fetch DRPC result from DRPC server", e);
-                }
-            }
-            checkFutures();
-        } else {
-            DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
-            if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
-                try {
-                    DRPCRequest req = drpc.fetchRequest(_function);
-                    if(req.get_request_id().length() > 0) {
-                        Map returnInfo = new HashMap();
-                        returnInfo.put("id", req.get_request_id());
-                        returnInfo.put("host", _local_drpc_id);
-                        returnInfo.put("port", 0);
-                        gotRequest = true;
-                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0));
-                    }
-                } catch (AuthorizationException aze) {
-                    throw new RuntimeException(aze);
-                } catch (TException e) {
-                    throw new RuntimeException(e);
-                }
-            }
-        }
-        if(!gotRequest) {
-            Utils.sleep(1);
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        DRPCMessageId did = (DRPCMessageId) msgId;
-        DistributedRPCInvocations.Iface client;
-        
-        if(_local_drpc_id == null) {
-            client = _clients.get(did.index);
-        } else {
-            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
-        }
-        try {
-            client.failRequest(did.id);
-        } catch (AuthorizationException aze) {
-            LOG.error("Not authorized to failREquest from DRPC server", aze);
-        } catch (TException e) {
-            LOG.error("Failed to fail request", e);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        declarer.declare(new Fields("args", "return-info"));
-    }    
-}