You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:48 UTC

[09/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/list.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/list.java b/jstorm-core/src/main/java/backtype/storm/command/list.java
new file mode 100755
index 0000000..3b4efdb
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/list.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.TopologyInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Activate topology
+ * 
+ * @author longda
+ * 
+ */
+public class list {
+    
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        
+        NimbusClient client = null;
+        try {
+            
+            Map conf = Utils.readStormConfig();
+            client = NimbusClient.getConfiguredClient(conf);
+            
+            if (args.length > 0 && StringUtils.isBlank(args[0]) == false) {
+                String topologyName = args[0];
+                TopologyInfo info = client.getClient().getTopologyInfoByName(topologyName);
+                
+                System.out.println("Successfully get topology info \n" + Utils.toPrettyJsonString(info));
+            } else {
+                ClusterSummary clusterSummary = client.getClient().getClusterInfo();
+                
+                System.out.println("Successfully get cluster info \n" + Utils.toPrettyJsonString(clusterSummary));
+            }
+            
+        } catch (Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
new file mode 100755
index 0000000..6607445
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.util.Map;
+import java.security.InvalidParameterException;
+
+import backtype.storm.generated.MonitorOptions;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Monitor topology
+ * 
+ * @author Basti
+ * 
+ */
+public class metrics_monitor {
+    
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        // TODO Auto-generated method stub
+        if (args == null || args.length <= 1) {
+            throw new InvalidParameterException("Should input topology name and enable flag");
+        }
+        
+        String topologyName = args[0];
+        
+        NimbusClient client = null;
+        try {
+            
+            Map conf = Utils.readStormConfig();
+            client = NimbusClient.getConfiguredClient(conf);
+            
+            boolean isEnable = Boolean.valueOf(args[1]).booleanValue();
+            
+            MonitorOptions options = new MonitorOptions();
+            options.set_isEnable(isEnable);
+            
+            client.getClient().metricMonitor(topologyName, options);
+            
+            String str = (isEnable) ? "enable" : "disable";
+            System.out.println("Successfully submit command to " + str + " the monitor of " + topologyName);
+        } catch (Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
new file mode 100755
index 0000000..f0cf69f
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+import backtype.storm.generated.RebalanceOptions;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Active topology
+ * 
+ * @author longda
+ * 
+ */
+public class rebalance {
+    static final String REASSIGN_FLAG = "-r";
+    
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        // TODO Auto-generated method stub
+        if (args == null || args.length == 0) {
+            printErrorInfo();
+            return;
+        }
+        
+        int argsIndex = 0;
+        String topologyName = null;
+        
+        try {
+            RebalanceOptions options = new RebalanceOptions();
+            options.set_reassign(false);
+            options.set_conf(null);
+            
+            if (args[argsIndex].equalsIgnoreCase(REASSIGN_FLAG)) {
+                options.set_reassign(true);
+                argsIndex++;
+                if (args.length <= argsIndex) {
+                    // Topology name is not set.
+                    printErrorInfo();
+                    return;
+                } else {
+                    topologyName = args[argsIndex];
+                }
+            } else {
+                topologyName = args[argsIndex];
+            }
+            
+            argsIndex++;
+            if (args.length > argsIndex) {
+                for (int i = argsIndex; i < args.length; i++) {
+                    String arg = args[i];
+                    if (arg.endsWith("yaml") || arg.endsWith("prop")) {
+                        Map userConf = Utils.loadConf(arg);
+                        String jsonConf = Utils.to_json(userConf);
+                        options.set_conf(jsonConf);
+                    } else {
+                        try {
+                            int delaySeconds = Integer.parseInt(args[1]);
+                            options.set_wait_secs(delaySeconds);
+                        } catch (NumberFormatException e) {
+                            System.out.println("Unsupported argument found, arg=" + arg + ". Full args are " + args);
+                            printErrorInfo();
+                            return;
+                        }
+                    }
+                }
+            }
+            
+            submitRebalance(topologyName, options);
+            
+            System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" + options.get_wait_secs() + ", reassignFlag=" + options.is_reassign() + ", newConfiguration=" + options.get_conf());
+        } catch (Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        }
+    }
+    
+    private static void printErrorInfo() {
+        System.out.println("Error: Invalid parameters!");
+        System.out.println("USAGE: jstorm rebalance [-r] TopologyName [DelayTime] [NewConfig]");
+    }
+    
+    public static void submitRebalance(String topologyName, RebalanceOptions options) throws Exception {
+        submitRebalance(topologyName, options, null);
+    }
+    
+    public static void submitRebalance(String topologyName, RebalanceOptions options, Map conf) throws Exception {
+        Map stormConf = Utils.readStormConfig();
+        if (conf != null) {
+            stormConf.putAll(conf);
+        }
+        
+        NimbusClient client = null;
+        try {
+            client = NimbusClient.getConfiguredClient(stormConf);
+            client.getClient().rebalance(topologyName, options);
+        } catch (Exception e) {
+            throw e;
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/restart.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/restart.java b/jstorm-core/src/main/java/backtype/storm/command/restart.java
new file mode 100755
index 0000000..ecec9a3
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/restart.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.security.InvalidParameterException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.yaml.snakeyaml.Yaml;
+
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Active topology
+ * 
+ * @author basti
+ * 
+ */
+public class restart {
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        // TODO Auto-generated method stub
+        if (args == null || args.length == 0) {
+            throw new InvalidParameterException("Should input topology name");
+        }
+        
+        String topologyName = args[0];
+        
+        NimbusClient client = null;
+        try {
+            Map conf = Utils.readStormConfig();
+            client = NimbusClient.getConfiguredClient(conf);
+            
+            System.out.println("It will take 15 ~ 100 seconds to restart, please wait patiently\n");
+            
+            if (args.length == 1) {
+                client.getClient().restart(topologyName, null);
+            } else {
+                Map loadConf = Utils.loadConf(args[1]);
+                String jsonConf = Utils.to_json(loadConf);
+                System.out.println("New configuration:\n" + jsonConf);
+                
+                client.getClient().restart(topologyName, jsonConf);
+            }
+            
+            System.out.println("Successfully submit command restart " + topologyName);
+        } catch (Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/update_config.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_config.java b/jstorm-core/src/main/java/backtype/storm/command/update_config.java
new file mode 100644
index 0000000..be78f19
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/update_config.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Update user configuration
+ * 
+ * @author basti
+ * 
+ */
+public class update_config {
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+        // TODO Auto-generated method stub
+        if (args == null || args.length < 2) {
+            throw new InvalidParameterException(
+                    "[USAGE] update_config topologyName config");
+        }
+
+        String topologyName = args[0];
+
+        NimbusClient client = null;
+        try {
+            Map conf = Utils.readStormConfig();
+            client = NimbusClient.getConfiguredClient(conf);
+
+            Map loadConf = Utils.loadConf(args[1]);
+            String jsonConf = Utils.to_json(loadConf);
+            System.out.println("New configuration:\n" + jsonConf);
+
+            client.getClient().updateConf(topologyName, jsonConf);
+
+            System.out.println("Successfully submit command update_conf "
+                    + topologyName);
+        } catch (Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            throw new RuntimeException(e);
+        } finally {
+            if (client != null) {
+                client.close();
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
new file mode 100755
index 0000000..8653010
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
+import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Utils;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
+    public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);    
+
+    byte[] _boltSer;
+    Map<Object, IBatchBolt> _openTransactions;
+    Map _conf;
+    TopologyContext _context;
+    BatchOutputCollectorImpl _collector;
+    
+    public BatchBoltExecutor(IBatchBolt bolt) {
+        _boltSer = Utils.javaSerialize(bolt);
+    }
+    
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+        _conf = conf;
+        _context = context;
+        _collector = new BatchOutputCollectorImpl(collector);
+        _openTransactions = new HashMap<Object, IBatchBolt>();
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        Object id = input.getValue(0);
+        IBatchBolt bolt = getBatchBolt(id);
+        try {
+             bolt.execute(input);
+            _collector.ack(input);
+        } catch(FailedException e) {
+            LOG.error("Failed to process tuple in batch", e);
+            _collector.fail(input);                
+        }
+    }
+
+    @Override
+    public void cleanup() {
+    }
+
+    @Override
+    public void finishedId(Object id) {
+        IBatchBolt bolt = getBatchBolt(id);
+        _openTransactions.remove(id);
+        bolt.finishBatch();
+    }
+
+    @Override
+    public void timeoutId(Object attempt) {
+        _openTransactions.remove(attempt);        
+    }    
+    
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        newTransactionalBolt().declareOutputFields(declarer);
+    }
+    
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return newTransactionalBolt().getComponentConfiguration();
+    }
+    
+    private IBatchBolt getBatchBolt(Object id) {
+        IBatchBolt bolt = _openTransactions.get(id);
+        if(bolt==null) {
+            bolt = newTransactionalBolt();
+            bolt.prepare(_conf, _context, _collector, id);
+            _openTransactions.put(id, bolt);            
+        }
+        return bolt;
+    }
+    
+    private IBatchBolt newTransactionalBolt() {
+        return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
new file mode 100755
index 0000000..f5f3457
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.utils.Utils;
+import java.util.List;
+
+public abstract class BatchOutputCollector {
+
+    /**
+     * Emits a tuple to the default output stream.
+     */
+    public List<Integer> emit(List<Object> tuple) {
+        return emit(Utils.DEFAULT_STREAM_ID, tuple);
+    }
+
+    public abstract List<Integer> emit(String streamId, List<Object> tuple);
+    
+    /**
+     * Emits a tuple to the specified task on the default output stream. This output
+     * stream must have been declared as a direct stream, and the specified task must
+     * use a direct grouping on this stream to receive the message.
+     */
+    public void emitDirect(int taskId, List<Object> tuple) {
+        emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
+    }
+    
+    public abstract void emitDirect(int taskId, String streamId, List<Object> tuple); 
+    
+    public abstract void reportError(Throwable error);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
new file mode 100755
index 0000000..cae7560
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import java.util.List;
+
+public class BatchOutputCollectorImpl extends BatchOutputCollector {
+    OutputCollector _collector;
+    
+    public BatchOutputCollectorImpl(OutputCollector collector) {
+        _collector = collector;
+    }
+    
+    @Override
+    public List<Integer> emit(String streamId, List<Object> tuple) {
+        return _collector.emit(streamId, tuple);
+    }
+
+    @Override
+    public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+        _collector.emitDirect(taskId, streamId, tuple);
+    }
+
+    @Override
+    public void reportError(Throwable error) {
+        _collector.reportError(error);
+    }
+    
+    public void ack(Tuple tup) {
+        _collector.ack(tup);
+    }
+    
+    public void fail(Tuple tup) {
+        _collector.fail(tup);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
new file mode 100755
index 0000000..2a77f3b
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.Constants;
+import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.grouping.PartialKeyGrouping;
+import backtype.storm.topology.BaseConfigurationDeclarer;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.InputDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class BatchSubtopologyBuilder {
+    Map<String, Component> _bolts = new HashMap<String, Component>();
+    Component _masterBolt;
+    String _masterId;
+    
+    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
+        Integer p = boltParallelism == null ? null : boltParallelism.intValue();
+        _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
+        _masterId = masterBoltId;
+    }
+    
+    public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
+        this(masterBoltId, masterBolt, null);
+    }
+    
+    public BoltDeclarer getMasterDeclarer() {
+        return new BoltDeclarerImpl(_masterBolt);
+    }
+        
+    public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
+        return setBolt(id, bolt, null);
+    }
+    
+    public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
+        return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
+    }     
+    
+    public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
+        return setBolt(id, bolt, null);
+    }    
+    
+    public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
+        return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
+    }
+    
+    private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
+        Integer p = null;
+        if(parallelism!=null) p = parallelism.intValue();
+        Component component = new Component(bolt, p);
+        _bolts.put(id, component);
+        return new BoltDeclarerImpl(component);
+    }
+    
+    public void extendTopology(TopologyBuilder builder) {
+        BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
+        for(InputDeclaration decl: _masterBolt.declarations) {
+            decl.declare(declarer);
+        }
+        for(Map conf: _masterBolt.componentConfs) {
+            declarer.addConfigurations(conf);
+        }
+        for(String id: _bolts.keySet()) {
+            Component component = _bolts.get(id);
+            Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
+            for(String c: componentBoltSubscriptions(component)) {
+                SourceArgs source;
+                if(c.equals(_masterId)) {
+                    source = SourceArgs.single();
+                } else {
+                    source = SourceArgs.all();
+                }
+                coordinatedArgs.put(c, source);                    
+            }
+            
+
+            BoltDeclarer input = builder.setBolt(id,
+                                                  new CoordinatedBolt(component.bolt,
+                                                                      coordinatedArgs,
+                                                                      null),
+                                                  component.parallelism);
+            for(Map conf: component.componentConfs) {
+                input.addConfigurations(conf);
+            }
+            for(String c: componentBoltSubscriptions(component)) {
+                input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
+            }
+            for(InputDeclaration d: component.declarations) {
+                d.declare(input);
+            }
+        }        
+    }
+        
+    private Set<String> componentBoltSubscriptions(Component component) {
+        Set<String> ret = new HashSet<String>();
+        for(InputDeclaration d: component.declarations) {
+            ret.add(d.getComponent());
+        }
+        return ret;
+    }
+
+    private static class Component {
+        public IRichBolt bolt;
+        public Integer parallelism;
+        public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
+        public List<Map> componentConfs = new ArrayList<Map>();
+        
+        public Component(IRichBolt bolt, Integer parallelism) {
+            this.bolt = bolt;
+            this.parallelism = parallelism;
+        }
+    }
+    
+    private static interface InputDeclaration {
+        void declare(InputDeclarer declarer);
+        String getComponent();
+    }
+        
+    private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
+        Component _component;
+        
+        public BoltDeclarerImpl(Component component) {
+            _component = component;
+        }
+        
+        @Override
+        public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.fieldsGrouping(component, fields);
+                }
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.fieldsGrouping(component, streamId, fields);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer globalGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.globalGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer globalGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.globalGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer shuffleGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.shuffleGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.shuffleGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer localOrShuffleGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.localOrShuffleGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.localOrShuffleGrouping(component, streamId);
+                }                
+                
+                @Override
+                public String getComponent() {
+                    return component;
+                }
+            });
+            return this;
+        }
+        
+        @Override
+        public BoltDeclarer localFirstGrouping(final String componentId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.localFirstGrouping(componentId);
+                }
+                
+                @Override
+                public String getComponent() {
+                    return componentId;
+                }
+            });
+            return this;
+        }
+        
+        @Override
+        public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.localFirstGrouping(component, streamId);
+                }
+                
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+        
+        @Override
+        public BoltDeclarer noneGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.noneGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer noneGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.noneGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer allGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.allGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer allGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.allGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer directGrouping(final String component) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.directGrouping(component);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer directGrouping(final String component, final String streamId) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.directGrouping(component, streamId);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+            return customGrouping(componentId, new PartialKeyGrouping(fields));
+        }
+
+        @Override
+        public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+            return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+        }
+        
+        @Override
+        public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.customGrouping(component, grouping);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;        
+        }
+
+        @Override
+        public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.customGrouping(component, streamId, grouping);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return component;
+                }                
+            });
+            return this;
+        }
+
+        @Override
+        public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
+            addDeclaration(new InputDeclaration() {
+                @Override
+                public void declare(InputDeclarer declarer) {
+                    declarer.grouping(stream, grouping);
+                }                
+
+                @Override
+                public String getComponent() {
+                    return stream.get_componentId();
+                }                
+            });
+            return this;
+        }
+        
+        private void addDeclaration(InputDeclaration declaration) {
+            _component.declarations.add(declaration);
+        }
+
+        @Override
+        public BoltDeclarer addConfigurations(Map conf) {
+            _component.componentConfs.add(conf);
+            return this;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
new file mode 100755
index 0000000..6f337a6
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.topology.FailedException;
+import java.util.Map.Entry;
+import backtype.storm.tuple.Values;
+import backtype.storm.generated.GlobalStreamId;
+import java.util.Collection;
+import backtype.storm.Constants;
+import backtype.storm.generated.Grouping;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TimeCacheMap;
+import backtype.storm.utils.Utils;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static backtype.storm.utils.Utils.get;
+
+/**
+ * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
+ * in the case of retries.
+ */
+public class CoordinatedBolt implements IRichBolt {
+    public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
+
+    public static interface FinishedCallback {
+        void finishedId(Object id);
+    }
+
+    public static interface TimeoutCallback {
+        void timeoutId(Object id);
+    }
+    
+    
+    public static class SourceArgs implements Serializable {
+        public boolean singleCount;
+
+        protected SourceArgs(boolean singleCount) {
+            this.singleCount = singleCount;
+        }
+
+        public static SourceArgs single() {
+            return new SourceArgs(true);
+        }
+
+        public static SourceArgs all() {
+            return new SourceArgs(false);
+        }
+        
+        @Override
+        public String toString() {
+            return "<Single: " + singleCount + ">";
+        }
+    }
+
+    public class CoordinatedOutputCollector implements IOutputCollector {
+        IOutputCollector _delegate;
+
+        public CoordinatedOutputCollector(IOutputCollector delegate) {
+            _delegate = delegate;
+        }
+
+        public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
+            List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
+            updateTaskCounts(tuple.get(0), tasks);
+            return tasks;
+        }
+
+        public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
+            updateTaskCounts(tuple.get(0), Arrays.asList(task));
+            _delegate.emitDirect(task, stream, anchors, tuple);
+        }
+
+        public void ack(Tuple tuple) {
+            Object id = tuple.getValue(0);
+            synchronized(_tracked) {
+                TrackingInfo track = _tracked.get(id);
+                if (track != null)
+                    track.receivedTuples++;
+            }
+            boolean failed = checkFinishId(tuple, TupleType.REGULAR);
+            if(failed) {
+                _delegate.fail(tuple);                
+            } else {
+                _delegate.ack(tuple);
+            }
+        }
+
+        public void fail(Tuple tuple) {
+            Object id = tuple.getValue(0);
+            synchronized(_tracked) {
+                TrackingInfo track = _tracked.get(id);
+                if (track != null)
+                    track.failed = true;
+            }
+            checkFinishId(tuple, TupleType.REGULAR);
+            _delegate.fail(tuple);
+        }
+        
+        public void reportError(Throwable error) {
+            _delegate.reportError(error);
+        }
+
+
+        private void updateTaskCounts(Object id, List<Integer> tasks) {
+            synchronized(_tracked) {
+                TrackingInfo track = _tracked.get(id);
+                if (track != null) {
+                    Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
+                    for(Integer task: tasks) {
+                        int newCount = get(taskEmittedTuples, task, 0) + 1;
+                        taskEmittedTuples.put(task, newCount);
+                    }
+                }
+            }
+        }
+    }
+
+    private Map<String, SourceArgs> _sourceArgs;
+    private IdStreamSpec _idStreamSpec;
+    private IRichBolt _delegate;
+    private Integer _numSourceReports;
+    private List<Integer> _countOutTasks = new ArrayList<Integer>();;
+    private OutputCollector _collector;
+    private TimeCacheMap<Object, TrackingInfo> _tracked;
+
+    public static class TrackingInfo {
+        int reportCount = 0;
+        int expectedTupleCount = 0;
+        int receivedTuples = 0;
+        boolean failed = false;
+        Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
+        boolean receivedId = false;
+        boolean finished = false;
+        List<Tuple> ackTuples = new ArrayList<Tuple>();
+        
+        @Override
+        public String toString() {
+            return "reportCount: " + reportCount + "\n" +
+                   "expectedTupleCount: " + expectedTupleCount + "\n" +
+                   "receivedTuples: " + receivedTuples + "\n" +
+                   "failed: " + failed + "\n" +
+                   taskEmittedTuples.toString();
+        }
+    }
+
+    
+    public static class IdStreamSpec implements Serializable {
+        GlobalStreamId _id;
+        
+        public GlobalStreamId getGlobalStreamId() {
+            return _id;
+        }
+
+        public static IdStreamSpec makeDetectSpec(String component, String stream) {
+            return new IdStreamSpec(component, stream);
+        }        
+        
+        protected IdStreamSpec(String component, String stream) {
+            _id = new GlobalStreamId(component, stream);
+        }
+    }
+    
+    public CoordinatedBolt(IRichBolt delegate) {
+        this(delegate, null, null);
+    }
+
+    public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
+        this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
+    }
+    
+    public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
+        _sourceArgs = sourceArgs;
+        if(_sourceArgs==null) _sourceArgs = new HashMap<String, SourceArgs>();
+        _delegate = delegate;
+        _idStreamSpec = idStreamSpec;
+    }
+    
+    public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+        TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
+        if(_delegate instanceof TimeoutCallback) {
+            callback = new TimeoutItems();
+        }
+        _tracked = new TimeCacheMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback);
+        _collector = collector;
+        _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
+        for(String component: Utils.get(context.getThisTargets(),
+                                        Constants.COORDINATED_STREAM_ID,
+                                        new HashMap<String, Grouping>())
+                                        .keySet()) {
+            for(Integer task: context.getComponentTasks(component)) {
+                _countOutTasks.add(task);
+            }
+        }
+        if(!_sourceArgs.isEmpty()) {
+            _numSourceReports = 0;
+            for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
+                if(entry.getValue().singleCount) {
+                    _numSourceReports+=1;
+                } else {
+                    _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
+                }
+            }
+        }
+    }
+
+    private boolean checkFinishId(Tuple tup, TupleType type) {
+        Object id = tup.getValue(0);
+        boolean failed = false;
+        
+        synchronized(_tracked) {
+            TrackingInfo track = _tracked.get(id);
+            try {
+                if(track!=null) {
+                    boolean delayed = false;
+                    if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
+                        track.ackTuples.add(tup);
+                        delayed = true;
+                    }
+                    if(track.failed) {
+                        failed = true;
+                        for(Tuple t: track.ackTuples) {
+                            _collector.fail(t);
+                        }
+                        _tracked.remove(id);
+                    } else if(track.receivedId
+                             && (_sourceArgs.isEmpty() ||
+                                  track.reportCount==_numSourceReports &&
+                                  track.expectedTupleCount == track.receivedTuples)){
+                        if(_delegate instanceof FinishedCallback) {
+                            ((FinishedCallback)_delegate).finishedId(id);
+                        }
+                        if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
+                            throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
+                        }
+                        Iterator<Integer> outTasks = _countOutTasks.iterator();
+                        while(outTasks.hasNext()) {
+                            int task = outTasks.next();
+                            int numTuples = get(track.taskEmittedTuples, task, 0);
+                            _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
+                        }
+                        for(Tuple t: track.ackTuples) {
+                            _collector.ack(t);
+                        }
+                        track.finished = true;
+                        _tracked.remove(id);
+                    }
+                    if(!delayed && type!=TupleType.REGULAR) {
+                        if(track.failed) {
+                            _collector.fail(tup);
+                        } else {
+                            _collector.ack(tup);                            
+                        }
+                    }
+                } else {
+                    if(type!=TupleType.REGULAR) _collector.fail(tup);
+                }
+            } catch(FailedException e) {
+                LOG.error("Failed to finish batch", e);
+                for(Tuple t: track.ackTuples) {
+                    _collector.fail(t);
+                }
+                _tracked.remove(id);
+                failed = true;
+            }
+        }
+        return failed;
+    }
+
+    public void execute(Tuple tuple) {
+        Object id = tuple.getValue(0);
+        TrackingInfo track;
+        TupleType type = getTupleType(tuple);
+        synchronized(_tracked) {
+            track = _tracked.get(id);
+            if(track==null) {
+                track = new TrackingInfo();
+                if(_idStreamSpec==null) track.receivedId = true;
+                _tracked.put(id, track);
+            }
+        }
+        
+        if(type==TupleType.ID) {
+            synchronized(_tracked) {
+                track.receivedId = true;
+            }
+            checkFinishId(tuple, type);            
+        } else if(type==TupleType.COORD) {
+            int count = (Integer) tuple.getValue(1);
+            synchronized(_tracked) {
+                track.reportCount++;
+                track.expectedTupleCount+=count;
+            }
+            checkFinishId(tuple, type);
+        } else {            
+            synchronized(_tracked) {
+                _delegate.execute(tuple);
+            }
+        }
+    }
+
+    public void cleanup() {
+        _delegate.cleanup();
+        _tracked.cleanup();
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        _delegate.declareOutputFields(declarer);
+        declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return _delegate.getComponentConfiguration();
+    }
+    
+    private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
+        Map<String, SourceArgs> ret = new HashMap<String, SourceArgs>();
+        ret.put(sourceComponent, sourceArgs);
+        return ret;
+    }
+    
+    private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
+        @Override
+        public void expire(Object id, TrackingInfo val) {
+            synchronized(_tracked) {
+                // the combination of the lock and the finished flag ensure that
+                // an id is never timed out if it has been finished
+                val.failed = true;
+                if(!val.finished) {
+                    ((TimeoutCallback) _delegate).timeoutId(id);
+                }
+            }
+        }
+    }
+    
+    private TupleType getTupleType(Tuple tuple) {
+        if(_idStreamSpec!=null
+                && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
+            return TupleType.ID;
+        } else if(!_sourceArgs.isEmpty()
+                && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
+            return TupleType.COORD;
+        } else {
+            return TupleType.REGULAR;
+        }
+    }
+    
+    static enum TupleType {
+        REGULAR,
+        ID,
+        COORD
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
new file mode 100755
index 0000000..ee5d9bd
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IComponent;
+import backtype.storm.tuple.Tuple;
+import java.io.Serializable;
+import java.util.Map;
+
+public interface IBatchBolt<T> extends Serializable, IComponent {
+    void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
+    void execute(Tuple tuple);
+    void finishBatch();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java b/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java
new file mode 100755
index 0000000..b1d8ddf
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.daemon;
+
+public interface Shutdownable {
+    public void shutdown();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
new file mode 100755
index 0000000..624db3e
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import backtype.storm.generated.DRPCRequest;
+import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
+    public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+    private final AtomicReference<DistributedRPCInvocations.Client> client =
+       new AtomicReference<DistributedRPCInvocations.Client>();
+    private String host;
+    private int port;
+
+    public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
+        super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
+        this.host = host;
+        this.port = port;
+        client.set(new DistributedRPCInvocations.Client(_protocol));
+    }
+        
+    public String getHost() {
+        return host;
+    }
+    
+    public int getPort() {
+        return port;
+    }       
+
+    public void reconnectClient() throws TException {
+        if (client.get() == null) {
+            reconnect();
+            client.set(new DistributedRPCInvocations.Client(_protocol));
+        }
+    }
+
+    public boolean isConnected() {
+        return client.get() != null;
+    }
+
+    public void result(String id, String result) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.result(id, result);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }
+
+    public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            return c.fetchRequest(func);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }    
+
+    public void failRequest(String id) throws TException, AuthorizationException {
+        DistributedRPCInvocations.Client c = client.get();
+        try {
+            if (c == null) {
+                throw new TException("Client is not connected...");
+            }
+            c.failRequest(id);
+        } catch(AuthorizationException aze) {
+            throw aze;
+        } catch(TException e) {
+            client.compareAndSet(c, null);
+            throw e;
+        }
+    }
+
+    public DistributedRPCInvocations.Client getClient() {
+        return client.get();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
new file mode 100644
index 0000000..4ed24d4
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.jstorm.utils.NetWorkUtils;
+
+import backtype.storm.Config;
+import backtype.storm.ILocalDRPC;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.DRPCRequest;
+import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.ExtendedThreadPoolExecutor;
+import backtype.storm.utils.ServiceRegistry;
+import backtype.storm.utils.Utils;
+
+public class DRPCSpout extends BaseRichSpout {
+    //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+    static final long serialVersionUID = 2387848310969237877L;
+
+    public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
+    
+    SpoutOutputCollector _collector;
+    List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
+    transient LinkedList<Future<Void>> _futures = null;
+    transient ExecutorService _backround = null;
+    String _function;
+    String _local_drpc_id = null;
+    
+    private static class DRPCMessageId {
+        String id;
+        int index;
+        
+        public DRPCMessageId(String id, int index) {
+            this.id = id;
+            this.index = index;
+        }
+    }
+    
+    
+    public DRPCSpout(String function) {
+        _function = function;
+    }
+
+    public DRPCSpout(String function, ILocalDRPC drpc) {
+        _function = function;
+        _local_drpc_id = drpc.getServiceId();
+    }
+   
+    private class Adder implements Callable<Void> {
+        private String server;
+        private int port;
+        private Map conf;
+
+        public Adder(String server, int port, Map conf) {
+            this.server = server;
+            this.port = port;
+            this.conf = conf;
+        }
+
+        @Override
+        public Void call() throws Exception {
+            DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
+            synchronized (_clients) {
+                _clients.add(c);
+            }
+            return null;
+        }
+    }
+
+    private void reconnect(final DRPCInvocationsClient c) {
+        _futures.add(_backround.submit(new Callable<Void>() {
+            @Override
+            public Void call() throws Exception {
+                c.reconnectClient();
+                return null;
+            }
+        }));
+    }
+
+    private void checkFutures() {
+        Iterator<Future<Void>> i = _futures.iterator();
+        while (i.hasNext()) {
+            Future<Void> f = i.next();
+            if (f.isDone()) {
+                i.remove();
+            }
+            try {
+                f.get();
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+    
+    
+ 
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        _collector = collector;
+        if(_local_drpc_id==null) {
+            _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
+                60L, TimeUnit.SECONDS,
+                new SynchronousQueue<Runnable>());
+            _futures = new LinkedList<Future<Void>>();
+
+            int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
+            int index = context.getThisTaskIndex();
+
+            int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
+            List<String> servers = NetWorkUtils.host2Ip((List<String>) conf.get(Config.DRPC_SERVERS));
+            
+            if(servers == null || servers.isEmpty()) {
+                throw new RuntimeException("No DRPC servers configured for topology");   
+            }
+            
+            if (numTasks < servers.size()) {
+                for (String s: servers) {
+                    _futures.add(_backround.submit(new Adder(s, port, conf)));
+                }
+            } else {        
+                int i = index % servers.size();
+                _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
+            }
+        }
+        
+    }
+
+    @Override
+    public void close() {
+        for(DRPCInvocationsClient client: _clients) {
+            client.close();
+        }
+    }
+
+    @Override
+    public void nextTuple() {
+        boolean gotRequest = false;
+        if(_local_drpc_id==null) {
+            int size = 0;
+            synchronized (_clients) {
+                size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+            }
+            for(int i=0; i<size; i++) {
+                DRPCInvocationsClient client;
+                synchronized (_clients) {
+                    client = _clients.get(i);
+                }
+                if (!client.isConnected()) {
+                    continue;
+                }
+                try {
+                    DRPCRequest req = client.fetchRequest(_function);
+                    if(req.get_request_id().length() > 0) {
+                        Map returnInfo = new HashMap();
+                        returnInfo.put("id", req.get_request_id());
+                        returnInfo.put("host", client.getHost());
+                        returnInfo.put("port", client.getPort());
+                        gotRequest = true;
+                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
+                        break;
+                    }
+                } catch (AuthorizationException aze) {
+                    reconnect(client);
+                    LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
+                } catch (TException e) {
+                    reconnect(client);
+                    LOG.error("Failed to fetch DRPC result from DRPC server", e);
+                } catch (Exception e) {
+                    LOG.error("Failed to fetch DRPC result from DRPC server", e);
+                }
+            }
+            checkFutures();
+        } else {
+            DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+            if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
+                try {
+                    DRPCRequest req = drpc.fetchRequest(_function);
+                    if(req.get_request_id().length() > 0) {
+                        Map returnInfo = new HashMap();
+                        returnInfo.put("id", req.get_request_id());
+                        returnInfo.put("host", _local_drpc_id);
+                        returnInfo.put("port", 0);
+                        gotRequest = true;
+                        _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0));
+                    }
+                } catch (AuthorizationException aze) {
+                    throw new RuntimeException(aze);
+                } catch (TException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        if(!gotRequest) {
+            Utils.sleep(1);
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        DRPCMessageId did = (DRPCMessageId) msgId;
+        DistributedRPCInvocations.Iface client;
+        
+        if(_local_drpc_id == null) {
+            client = _clients.get(did.index);
+        } else {
+            client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+        }
+        try {
+            client.failRequest(did.id);
+        } catch (AuthorizationException aze) {
+            LOG.error("Not authorized to failREquest from DRPC server", aze);
+        } catch (TException e) {
+            LOG.error("Failed to fail request", e);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("args", "return-info"));
+    }    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
new file mode 100755
index 0000000..b74b97e
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JoinResult extends BaseRichBolt {
+    public static Logger LOG = LoggerFactory.getLogger(JoinResult.class);
+
+    String returnComponent;
+    Map<Object, Tuple> returns = new HashMap<Object, Tuple>();
+    Map<Object, Tuple> results = new HashMap<Object, Tuple>();
+    OutputCollector _collector;
+
+    public JoinResult(String returnComponent) {
+        this.returnComponent = returnComponent;
+    }
+ 
+    public void prepare(Map map, TopologyContext context, OutputCollector collector) {
+        _collector = collector;
+    }
+
+    public void execute(Tuple tuple) {
+        Object requestId = tuple.getValue(0);
+        if(tuple.getSourceComponent().equals(returnComponent)) {
+            returns.put(requestId, tuple);
+        } else {
+            results.put(requestId, tuple);
+        }
+
+        if(returns.containsKey(requestId) && results.containsKey(requestId)) {
+            Tuple result = results.remove(requestId);
+            Tuple returner = returns.remove(requestId);
+            LOG.debug(result.getValue(1).toString());
+            List<Tuple> anchors = new ArrayList<Tuple>();
+            anchors.add(result);
+            anchors.add(returner);            
+            _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1)));
+            _collector.ack(result);
+            _collector.ack(returner);
+        }
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        declarer.declare(new Fields("result", "return-info"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
new file mode 100755
index 0000000..113163d
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.KeyedRoundRobinQueue;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class KeyedFairBolt implements IRichBolt, FinishedCallback {
+    IRichBolt _delegate;
+    KeyedRoundRobinQueue<Tuple> _rrQueue;
+    Thread _executor;
+    FinishedCallback _callback;
+
+    public KeyedFairBolt(IRichBolt delegate) {
+        _delegate = delegate;
+    }
+    
+    public KeyedFairBolt(IBasicBolt delegate) {
+        this(new BasicBoltExecutor(delegate));
+    }
+    
+    
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        if(_delegate instanceof FinishedCallback) {
+            _callback = (FinishedCallback) _delegate;
+        }
+        _delegate.prepare(stormConf, context, collector);
+        _rrQueue = new KeyedRoundRobinQueue<Tuple>();
+        _executor = new Thread(new Runnable() {
+            public void run() {
+                try {
+                    while(true) {
+                        _delegate.execute(_rrQueue.take());
+                    }
+                } catch (InterruptedException e) {
+
+                }
+            }
+        });
+        _executor.setDaemon(true);
+        _executor.start();
+    }
+
+    public void execute(Tuple input) {
+        Object key = input.getValue(0);
+        _rrQueue.add(key, input);
+    }
+
+    public void cleanup() {
+        _executor.interrupt();
+        _delegate.cleanup();
+    }
+
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        _delegate.declareOutputFields(declarer);
+    }
+
+    public void finishedId(Object id) {
+        if(_callback!=null) {
+            _callback.finishedId(id);
+        }
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return new HashMap<String, Object>();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
new file mode 100755
index 0000000..d03075e
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.topology.ComponentConfigurationDeclarer;
+import backtype.storm.tuple.Fields;
+
+public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
+    public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
+    public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
+
+    public LinearDRPCInputDeclarer globalGrouping();
+    public LinearDRPCInputDeclarer globalGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer shuffleGrouping();
+    public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer localOrShuffleGrouping();
+    public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
+    
+    public LinearDRPCInputDeclarer noneGrouping();
+    public LinearDRPCInputDeclarer noneGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer allGrouping();
+    public LinearDRPCInputDeclarer allGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer directGrouping();
+    public LinearDRPCInputDeclarer directGrouping(String streamId);
+
+    public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
+    public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields);
+
+    public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
+    public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping);
+    
+}