You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:40:48 UTC
[09/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/list.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/list.java b/jstorm-core/src/main/java/backtype/storm/command/list.java
new file mode 100755
index 0000000..3b4efdb
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/list.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.util.Map;
+
+import org.apache.commons.lang.StringUtils;
+
+import backtype.storm.generated.ClusterSummary;
+import backtype.storm.generated.TopologyInfo;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Activate topology
+ *
+ * @author longda
+ *
+ */
+public class list {
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ NimbusClient client = null;
+ try {
+
+ Map conf = Utils.readStormConfig();
+ client = NimbusClient.getConfiguredClient(conf);
+
+ if (args.length > 0 && StringUtils.isBlank(args[0]) == false) {
+ String topologyName = args[0];
+ TopologyInfo info = client.getClient().getTopologyInfoByName(topologyName);
+
+ System.out.println("Successfully get topology info \n" + Utils.toPrettyJsonString(info));
+ } else {
+ ClusterSummary clusterSummary = client.getClient().getClusterInfo();
+
+ System.out.println("Successfully get cluster info \n" + Utils.toPrettyJsonString(clusterSummary));
+ }
+
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
new file mode 100755
index 0000000..6607445
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/metrics_monitor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.util.Map;
+import java.security.InvalidParameterException;
+
+import backtype.storm.generated.MonitorOptions;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Monitor topology
+ *
+ * @author Basti
+ *
+ */
+public class metrics_monitor {
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+ if (args == null || args.length <= 1) {
+ throw new InvalidParameterException("Should input topology name and enable flag");
+ }
+
+ String topologyName = args[0];
+
+ NimbusClient client = null;
+ try {
+
+ Map conf = Utils.readStormConfig();
+ client = NimbusClient.getConfiguredClient(conf);
+
+ boolean isEnable = Boolean.valueOf(args[1]).booleanValue();
+
+ MonitorOptions options = new MonitorOptions();
+ options.set_isEnable(isEnable);
+
+ client.getClient().metricMonitor(topologyName, options);
+
+ String str = (isEnable) ? "enable" : "disable";
+ System.out.println("Successfully submit command to " + str + " the monitor of " + topologyName);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/rebalance.java b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
new file mode 100755
index 0000000..f0cf69f
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/rebalance.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+import backtype.storm.generated.RebalanceOptions;
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Active topology
+ *
+ * @author longda
+ *
+ */
+public class rebalance {
+ static final String REASSIGN_FLAG = "-r";
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+ if (args == null || args.length == 0) {
+ printErrorInfo();
+ return;
+ }
+
+ int argsIndex = 0;
+ String topologyName = null;
+
+ try {
+ RebalanceOptions options = new RebalanceOptions();
+ options.set_reassign(false);
+ options.set_conf(null);
+
+ if (args[argsIndex].equalsIgnoreCase(REASSIGN_FLAG)) {
+ options.set_reassign(true);
+ argsIndex++;
+ if (args.length <= argsIndex) {
+ // Topology name is not set.
+ printErrorInfo();
+ return;
+ } else {
+ topologyName = args[argsIndex];
+ }
+ } else {
+ topologyName = args[argsIndex];
+ }
+
+ argsIndex++;
+ if (args.length > argsIndex) {
+ for (int i = argsIndex; i < args.length; i++) {
+ String arg = args[i];
+ if (arg.endsWith("yaml") || arg.endsWith("prop")) {
+ Map userConf = Utils.loadConf(arg);
+ String jsonConf = Utils.to_json(userConf);
+ options.set_conf(jsonConf);
+ } else {
+ try {
+ int delaySeconds = Integer.parseInt(args[1]);
+ options.set_wait_secs(delaySeconds);
+ } catch (NumberFormatException e) {
+ System.out.println("Unsupported argument found, arg=" + arg + ". Full args are " + args);
+ printErrorInfo();
+ return;
+ }
+ }
+ }
+ }
+
+ submitRebalance(topologyName, options);
+
+ System.out.println("Successfully submit command rebalance " + topologyName + ", delaySecs=" + options.get_wait_secs() + ", reassignFlag=" + options.is_reassign() + ", newConfiguration=" + options.get_conf());
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static void printErrorInfo() {
+ System.out.println("Error: Invalid parameters!");
+ System.out.println("USAGE: jstorm rebalance [-r] TopologyName [DelayTime] [NewConfig]");
+ }
+
+ public static void submitRebalance(String topologyName, RebalanceOptions options) throws Exception {
+ submitRebalance(topologyName, options, null);
+ }
+
+ public static void submitRebalance(String topologyName, RebalanceOptions options, Map conf) throws Exception {
+ Map stormConf = Utils.readStormConfig();
+ if (conf != null) {
+ stormConf.putAll(conf);
+ }
+
+ NimbusClient client = null;
+ try {
+ client = NimbusClient.getConfiguredClient(stormConf);
+ client.getClient().rebalance(topologyName, options);
+ } catch (Exception e) {
+ throw e;
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/restart.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/restart.java b/jstorm-core/src/main/java/backtype/storm/command/restart.java
new file mode 100755
index 0000000..ecec9a3
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/restart.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.InputStream;
+import java.security.InvalidParameterException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.yaml.snakeyaml.Yaml;
+
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Active topology
+ *
+ * @author basti
+ *
+ */
+public class restart {
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+ if (args == null || args.length == 0) {
+ throw new InvalidParameterException("Should input topology name");
+ }
+
+ String topologyName = args[0];
+
+ NimbusClient client = null;
+ try {
+ Map conf = Utils.readStormConfig();
+ client = NimbusClient.getConfiguredClient(conf);
+
+ System.out.println("It will take 15 ~ 100 seconds to restart, please wait patiently\n");
+
+ if (args.length == 1) {
+ client.getClient().restart(topologyName, null);
+ } else {
+ Map loadConf = Utils.loadConf(args[1]);
+ String jsonConf = Utils.to_json(loadConf);
+ System.out.println("New configuration:\n" + jsonConf);
+
+ client.getClient().restart(topologyName, jsonConf);
+ }
+
+ System.out.println("Successfully submit command restart " + topologyName);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/command/update_config.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/command/update_config.java b/jstorm-core/src/main/java/backtype/storm/command/update_config.java
new file mode 100644
index 0000000..be78f19
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/command/update_config.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.command;
+
+import java.security.InvalidParameterException;
+import java.util.Map;
+
+import backtype.storm.utils.NimbusClient;
+import backtype.storm.utils.Utils;
+
+/**
+ * Update user configuration
+ *
+ * @author basti
+ *
+ */
+public class update_config {
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+ // TODO Auto-generated method stub
+ if (args == null || args.length < 2) {
+ throw new InvalidParameterException(
+ "[USAGE] update_config topologyName config");
+ }
+
+ String topologyName = args[0];
+
+ NimbusClient client = null;
+ try {
+ Map conf = Utils.readStormConfig();
+ client = NimbusClient.getConfiguredClient(conf);
+
+ Map loadConf = Utils.loadConf(args[1]);
+ String jsonConf = Utils.to_json(loadConf);
+ System.out.println("New configuration:\n" + jsonConf);
+
+ client.getClient().updateConf(topologyName, jsonConf);
+
+ System.out.println("Successfully submit command update_conf "
+ + topologyName);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ e.printStackTrace();
+ throw new RuntimeException(e);
+ } finally {
+ if (client != null) {
+ client.close();
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
new file mode 100755
index 0000000..8653010
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
+import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.FailedException;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.Utils;
+import java.util.HashMap;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCallback {
+ public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
+
+ byte[] _boltSer;
+ Map<Object, IBatchBolt> _openTransactions;
+ Map _conf;
+ TopologyContext _context;
+ BatchOutputCollectorImpl _collector;
+
+ public BatchBoltExecutor(IBatchBolt bolt) {
+ _boltSer = Utils.javaSerialize(bolt);
+ }
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _conf = conf;
+ _context = context;
+ _collector = new BatchOutputCollectorImpl(collector);
+ _openTransactions = new HashMap<Object, IBatchBolt>();
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ Object id = input.getValue(0);
+ IBatchBolt bolt = getBatchBolt(id);
+ try {
+ bolt.execute(input);
+ _collector.ack(input);
+ } catch(FailedException e) {
+ LOG.error("Failed to process tuple in batch", e);
+ _collector.fail(input);
+ }
+ }
+
+ @Override
+ public void cleanup() {
+ }
+
+ @Override
+ public void finishedId(Object id) {
+ IBatchBolt bolt = getBatchBolt(id);
+ _openTransactions.remove(id);
+ bolt.finishBatch();
+ }
+
+ @Override
+ public void timeoutId(Object attempt) {
+ _openTransactions.remove(attempt);
+ }
+
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ newTransactionalBolt().declareOutputFields(declarer);
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return newTransactionalBolt().getComponentConfiguration();
+ }
+
+ private IBatchBolt getBatchBolt(Object id) {
+ IBatchBolt bolt = _openTransactions.get(id);
+ if(bolt==null) {
+ bolt = newTransactionalBolt();
+ bolt.prepare(_conf, _context, _collector, id);
+ _openTransactions.put(id, bolt);
+ }
+ return bolt;
+ }
+
+ private IBatchBolt newTransactionalBolt() {
+ return Utils.javaDeserialize(_boltSer, IBatchBolt.class);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
new file mode 100755
index 0000000..f5f3457
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.utils.Utils;
+import java.util.List;
+
+public abstract class BatchOutputCollector {
+
+ /**
+ * Emits a tuple to the default output stream.
+ */
+ public List<Integer> emit(List<Object> tuple) {
+ return emit(Utils.DEFAULT_STREAM_ID, tuple);
+ }
+
+ public abstract List<Integer> emit(String streamId, List<Object> tuple);
+
+ /**
+ * Emits a tuple to the specified task on the default output stream. This output
+ * stream must have been declared as a direct stream, and the specified task must
+ * use a direct grouping on this stream to receive the message.
+ */
+ public void emitDirect(int taskId, List<Object> tuple) {
+ emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
+ }
+
+ public abstract void emitDirect(int taskId, String streamId, List<Object> tuple);
+
+ public abstract void reportError(Throwable error);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
new file mode 100755
index 0000000..cae7560
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.tuple.Tuple;
+import java.util.List;
+
+public class BatchOutputCollectorImpl extends BatchOutputCollector {
+ OutputCollector _collector;
+
+ public BatchOutputCollectorImpl(OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public List<Integer> emit(String streamId, List<Object> tuple) {
+ return _collector.emit(streamId, tuple);
+ }
+
+ @Override
+ public void emitDirect(int taskId, String streamId, List<Object> tuple) {
+ _collector.emitDirect(taskId, streamId, tuple);
+ }
+
+ @Override
+ public void reportError(Throwable error) {
+ _collector.reportError(error);
+ }
+
+ public void ack(Tuple tup) {
+ _collector.ack(tup);
+ }
+
+ public void fail(Tuple tup) {
+ _collector.fail(tup);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
new file mode 100755
index 0000000..2a77f3b
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
@@ -0,0 +1,479 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.Constants;
+import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
+import backtype.storm.generated.GlobalStreamId;
+import backtype.storm.generated.Grouping;
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.grouping.PartialKeyGrouping;
+import backtype.storm.topology.BaseConfigurationDeclarer;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.InputDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.tuple.Fields;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class BatchSubtopologyBuilder {
+ Map<String, Component> _bolts = new HashMap<String, Component>();
+ Component _masterBolt;
+ String _masterId;
+
+ public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt, Number boltParallelism) {
+ Integer p = boltParallelism == null ? null : boltParallelism.intValue();
+ _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
+ _masterId = masterBoltId;
+ }
+
+ public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
+ this(masterBoltId, masterBolt, null);
+ }
+
+ public BoltDeclarer getMasterDeclarer() {
+ return new BoltDeclarerImpl(_masterBolt);
+ }
+
+ public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
+ return setBolt(id, bolt, null);
+ }
+
+ public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
+ return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
+ }
+
+ public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
+ return setBolt(id, bolt, null);
+ }
+
+ public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
+ return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
+ }
+
+ private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
+ Integer p = null;
+ if(parallelism!=null) p = parallelism.intValue();
+ Component component = new Component(bolt, p);
+ _bolts.put(id, component);
+ return new BoltDeclarerImpl(component);
+ }
+
+ public void extendTopology(TopologyBuilder builder) {
+ BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(_masterBolt.bolt), _masterBolt.parallelism);
+ for(InputDeclaration decl: _masterBolt.declarations) {
+ decl.declare(declarer);
+ }
+ for(Map conf: _masterBolt.componentConfs) {
+ declarer.addConfigurations(conf);
+ }
+ for(String id: _bolts.keySet()) {
+ Component component = _bolts.get(id);
+ Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
+ for(String c: componentBoltSubscriptions(component)) {
+ SourceArgs source;
+ if(c.equals(_masterId)) {
+ source = SourceArgs.single();
+ } else {
+ source = SourceArgs.all();
+ }
+ coordinatedArgs.put(c, source);
+ }
+
+
+ BoltDeclarer input = builder.setBolt(id,
+ new CoordinatedBolt(component.bolt,
+ coordinatedArgs,
+ null),
+ component.parallelism);
+ for(Map conf: component.componentConfs) {
+ input.addConfigurations(conf);
+ }
+ for(String c: componentBoltSubscriptions(component)) {
+ input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
+ }
+ for(InputDeclaration d: component.declarations) {
+ d.declare(input);
+ }
+ }
+ }
+
+ private Set<String> componentBoltSubscriptions(Component component) {
+ Set<String> ret = new HashSet<String>();
+ for(InputDeclaration d: component.declarations) {
+ ret.add(d.getComponent());
+ }
+ return ret;
+ }
+
+ private static class Component {
+ public IRichBolt bolt;
+ public Integer parallelism;
+ public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
+ public List<Map> componentConfs = new ArrayList<Map>();
+
+ public Component(IRichBolt bolt, Integer parallelism) {
+ this.bolt = bolt;
+ this.parallelism = parallelism;
+ }
+ }
+
+ private static interface InputDeclaration {
+ void declare(InputDeclarer declarer);
+ String getComponent();
+ }
+
+ private class BoltDeclarerImpl extends BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
+ Component _component;
+
+ public BoltDeclarerImpl(Component component) {
+ _component = component;
+ }
+
+ @Override
+ public BoltDeclarer fieldsGrouping(final String component, final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.fieldsGrouping(component, fields);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer fieldsGrouping(final String component, final String streamId, final Fields fields) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.fieldsGrouping(component, streamId, fields);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer globalGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.globalGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer globalGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.globalGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer shuffleGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.shuffleGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer shuffleGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.shuffleGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localOrShuffleGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localOrShuffleGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localOrShuffleGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localFirstGrouping(final String componentId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localFirstGrouping(componentId);
+ }
+
+ @Override
+ public String getComponent() {
+ return componentId;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer localFirstGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.localFirstGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer noneGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.noneGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer noneGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.noneGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer allGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.allGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer allGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.allGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer directGrouping(final String component) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.directGrouping(component);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer directGrouping(final String component, final String streamId) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.directGrouping(component, streamId);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer partialKeyGrouping(String componentId, Fields fields) {
+ return customGrouping(componentId, new PartialKeyGrouping(fields));
+ }
+
+ @Override
+ public BoltDeclarer partialKeyGrouping(String componentId, String streamId, Fields fields) {
+ return customGrouping(componentId, streamId, new PartialKeyGrouping(fields));
+ }
+
+ @Override
+ public BoltDeclarer customGrouping(final String component, final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.customGrouping(component, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer customGrouping(final String component, final String streamId, final CustomStreamGrouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.customGrouping(component, streamId, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return component;
+ }
+ });
+ return this;
+ }
+
+ @Override
+ public BoltDeclarer grouping(final GlobalStreamId stream, final Grouping grouping) {
+ addDeclaration(new InputDeclaration() {
+ @Override
+ public void declare(InputDeclarer declarer) {
+ declarer.grouping(stream, grouping);
+ }
+
+ @Override
+ public String getComponent() {
+ return stream.get_componentId();
+ }
+ });
+ return this;
+ }
+
+ private void addDeclaration(InputDeclaration declaration) {
+ _component.declarations.add(declaration);
+ }
+
+ @Override
+ public BoltDeclarer addConfigurations(Map conf) {
+ _component.componentConfs.add(conf);
+ return this;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
new file mode 100755
index 0000000..6f337a6
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
@@ -0,0 +1,382 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.topology.FailedException;
+import java.util.Map.Entry;
+import backtype.storm.tuple.Values;
+import backtype.storm.generated.GlobalStreamId;
+import java.util.Collection;
+import backtype.storm.Constants;
+import backtype.storm.generated.Grouping;
+import backtype.storm.task.IOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TimeCacheMap;
+import backtype.storm.utils.Utils;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import static backtype.storm.utils.Utils.get;
+
+/**
+ * Coordination requires the request ids to be globally unique for awhile. This is so it doesn't get confused
+ * in the case of retries.
+ */
+public class CoordinatedBolt implements IRichBolt {
+ public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
+
+ public static interface FinishedCallback {
+ void finishedId(Object id);
+ }
+
+ public static interface TimeoutCallback {
+ void timeoutId(Object id);
+ }
+
+
+ public static class SourceArgs implements Serializable {
+ public boolean singleCount;
+
+ protected SourceArgs(boolean singleCount) {
+ this.singleCount = singleCount;
+ }
+
+ public static SourceArgs single() {
+ return new SourceArgs(true);
+ }
+
+ public static SourceArgs all() {
+ return new SourceArgs(false);
+ }
+
+ @Override
+ public String toString() {
+ return "<Single: " + singleCount + ">";
+ }
+ }
+
+ public class CoordinatedOutputCollector implements IOutputCollector {
+ IOutputCollector _delegate;
+
+ public CoordinatedOutputCollector(IOutputCollector delegate) {
+ _delegate = delegate;
+ }
+
+ public List<Integer> emit(String stream, Collection<Tuple> anchors, List<Object> tuple) {
+ List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
+ updateTaskCounts(tuple.get(0), tasks);
+ return tasks;
+ }
+
+ public void emitDirect(int task, String stream, Collection<Tuple> anchors, List<Object> tuple) {
+ updateTaskCounts(tuple.get(0), Arrays.asList(task));
+ _delegate.emitDirect(task, stream, anchors, tuple);
+ }
+
+ public void ack(Tuple tuple) {
+ Object id = tuple.getValue(0);
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ if (track != null)
+ track.receivedTuples++;
+ }
+ boolean failed = checkFinishId(tuple, TupleType.REGULAR);
+ if(failed) {
+ _delegate.fail(tuple);
+ } else {
+ _delegate.ack(tuple);
+ }
+ }
+
+ public void fail(Tuple tuple) {
+ Object id = tuple.getValue(0);
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ if (track != null)
+ track.failed = true;
+ }
+ checkFinishId(tuple, TupleType.REGULAR);
+ _delegate.fail(tuple);
+ }
+
+ public void reportError(Throwable error) {
+ _delegate.reportError(error);
+ }
+
+
+ private void updateTaskCounts(Object id, List<Integer> tasks) {
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ if (track != null) {
+ Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
+ for(Integer task: tasks) {
+ int newCount = get(taskEmittedTuples, task, 0) + 1;
+ taskEmittedTuples.put(task, newCount);
+ }
+ }
+ }
+ }
+ }
+
+ private Map<String, SourceArgs> _sourceArgs;
+ private IdStreamSpec _idStreamSpec;
+ private IRichBolt _delegate;
+ private Integer _numSourceReports;
+ private List<Integer> _countOutTasks = new ArrayList<Integer>();;
+ private OutputCollector _collector;
+ private TimeCacheMap<Object, TrackingInfo> _tracked;
+
+ public static class TrackingInfo {
+ int reportCount = 0;
+ int expectedTupleCount = 0;
+ int receivedTuples = 0;
+ boolean failed = false;
+ Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
+ boolean receivedId = false;
+ boolean finished = false;
+ List<Tuple> ackTuples = new ArrayList<Tuple>();
+
+ @Override
+ public String toString() {
+ return "reportCount: " + reportCount + "\n" +
+ "expectedTupleCount: " + expectedTupleCount + "\n" +
+ "receivedTuples: " + receivedTuples + "\n" +
+ "failed: " + failed + "\n" +
+ taskEmittedTuples.toString();
+ }
+ }
+
+
+ public static class IdStreamSpec implements Serializable {
+ GlobalStreamId _id;
+
+ public GlobalStreamId getGlobalStreamId() {
+ return _id;
+ }
+
+ public static IdStreamSpec makeDetectSpec(String component, String stream) {
+ return new IdStreamSpec(component, stream);
+ }
+
+ protected IdStreamSpec(String component, String stream) {
+ _id = new GlobalStreamId(component, stream);
+ }
+ }
+
+ public CoordinatedBolt(IRichBolt delegate) {
+ this(delegate, null, null);
+ }
+
+ public CoordinatedBolt(IRichBolt delegate, String sourceComponent, SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
+ this(delegate, singleSourceArgs(sourceComponent, sourceArgs), idStreamSpec);
+ }
+
+ public CoordinatedBolt(IRichBolt delegate, Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
+ _sourceArgs = sourceArgs;
+ if(_sourceArgs==null) _sourceArgs = new HashMap<String, SourceArgs>();
+ _delegate = delegate;
+ _idStreamSpec = idStreamSpec;
+ }
+
+ public void prepare(Map config, TopologyContext context, OutputCollector collector) {
+ TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
+ if(_delegate instanceof TimeoutCallback) {
+ callback = new TimeoutItems();
+ }
+ _tracked = new TimeCacheMap<Object, TrackingInfo>(context.maxTopologyMessageTimeout(), callback);
+ _collector = collector;
+ _delegate.prepare(config, context, new OutputCollector(new CoordinatedOutputCollector(collector)));
+ for(String component: Utils.get(context.getThisTargets(),
+ Constants.COORDINATED_STREAM_ID,
+ new HashMap<String, Grouping>())
+ .keySet()) {
+ for(Integer task: context.getComponentTasks(component)) {
+ _countOutTasks.add(task);
+ }
+ }
+ if(!_sourceArgs.isEmpty()) {
+ _numSourceReports = 0;
+ for(Entry<String, SourceArgs> entry: _sourceArgs.entrySet()) {
+ if(entry.getValue().singleCount) {
+ _numSourceReports+=1;
+ } else {
+ _numSourceReports+=context.getComponentTasks(entry.getKey()).size();
+ }
+ }
+ }
+ }
+
+ private boolean checkFinishId(Tuple tup, TupleType type) {
+ Object id = tup.getValue(0);
+ boolean failed = false;
+
+ synchronized(_tracked) {
+ TrackingInfo track = _tracked.get(id);
+ try {
+ if(track!=null) {
+ boolean delayed = false;
+ if(_idStreamSpec==null && type == TupleType.COORD || _idStreamSpec!=null && type==TupleType.ID) {
+ track.ackTuples.add(tup);
+ delayed = true;
+ }
+ if(track.failed) {
+ failed = true;
+ for(Tuple t: track.ackTuples) {
+ _collector.fail(t);
+ }
+ _tracked.remove(id);
+ } else if(track.receivedId
+ && (_sourceArgs.isEmpty() ||
+ track.reportCount==_numSourceReports &&
+ track.expectedTupleCount == track.receivedTuples)){
+ if(_delegate instanceof FinishedCallback) {
+ ((FinishedCallback)_delegate).finishedId(id);
+ }
+ if(!(_sourceArgs.isEmpty() || type!=TupleType.REGULAR)) {
+ throw new IllegalStateException("Coordination condition met on a non-coordinating tuple. Should be impossible");
+ }
+ Iterator<Integer> outTasks = _countOutTasks.iterator();
+ while(outTasks.hasNext()) {
+ int task = outTasks.next();
+ int numTuples = get(track.taskEmittedTuples, task, 0);
+ _collector.emitDirect(task, Constants.COORDINATED_STREAM_ID, tup, new Values(id, numTuples));
+ }
+ for(Tuple t: track.ackTuples) {
+ _collector.ack(t);
+ }
+ track.finished = true;
+ _tracked.remove(id);
+ }
+ if(!delayed && type!=TupleType.REGULAR) {
+ if(track.failed) {
+ _collector.fail(tup);
+ } else {
+ _collector.ack(tup);
+ }
+ }
+ } else {
+ if(type!=TupleType.REGULAR) _collector.fail(tup);
+ }
+ } catch(FailedException e) {
+ LOG.error("Failed to finish batch", e);
+ for(Tuple t: track.ackTuples) {
+ _collector.fail(t);
+ }
+ _tracked.remove(id);
+ failed = true;
+ }
+ }
+ return failed;
+ }
+
+ public void execute(Tuple tuple) {
+ Object id = tuple.getValue(0);
+ TrackingInfo track;
+ TupleType type = getTupleType(tuple);
+ synchronized(_tracked) {
+ track = _tracked.get(id);
+ if(track==null) {
+ track = new TrackingInfo();
+ if(_idStreamSpec==null) track.receivedId = true;
+ _tracked.put(id, track);
+ }
+ }
+
+ if(type==TupleType.ID) {
+ synchronized(_tracked) {
+ track.receivedId = true;
+ }
+ checkFinishId(tuple, type);
+ } else if(type==TupleType.COORD) {
+ int count = (Integer) tuple.getValue(1);
+ synchronized(_tracked) {
+ track.reportCount++;
+ track.expectedTupleCount+=count;
+ }
+ checkFinishId(tuple, type);
+ } else {
+ synchronized(_tracked) {
+ _delegate.execute(tuple);
+ }
+ }
+ }
+
+ public void cleanup() {
+ _delegate.cleanup();
+ _tracked.cleanup();
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ _delegate.declareOutputFields(declarer);
+ declarer.declareStream(Constants.COORDINATED_STREAM_ID, true, new Fields("id", "count"));
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return _delegate.getComponentConfiguration();
+ }
+
+ private static Map<String, SourceArgs> singleSourceArgs(String sourceComponent, SourceArgs sourceArgs) {
+ Map<String, SourceArgs> ret = new HashMap<String, SourceArgs>();
+ ret.put(sourceComponent, sourceArgs);
+ return ret;
+ }
+
+ private class TimeoutItems implements TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
+ @Override
+ public void expire(Object id, TrackingInfo val) {
+ synchronized(_tracked) {
+ // the combination of the lock and the finished flag ensure that
+ // an id is never timed out if it has been finished
+ val.failed = true;
+ if(!val.finished) {
+ ((TimeoutCallback) _delegate).timeoutId(id);
+ }
+ }
+ }
+ }
+
+ private TupleType getTupleType(Tuple tuple) {
+ if(_idStreamSpec!=null
+ && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
+ return TupleType.ID;
+ } else if(!_sourceArgs.isEmpty()
+ && tuple.getSourceStreamId().equals(Constants.COORDINATED_STREAM_ID)) {
+ return TupleType.COORD;
+ } else {
+ return TupleType.REGULAR;
+ }
+ }
+
+ static enum TupleType {
+ REGULAR,
+ ID,
+ COORD
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
new file mode 100755
index 0000000..ee5d9bd
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/coordination/IBatchBolt.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.coordination;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.IComponent;
+import backtype.storm.tuple.Tuple;
+import java.io.Serializable;
+import java.util.Map;
+
+public interface IBatchBolt<T> extends Serializable, IComponent {
+ void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
+ void execute(Tuple tuple);
+ void finishBatch();
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java b/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java
new file mode 100755
index 0000000..b1d8ddf
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/daemon/Shutdownable.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.daemon;
+
+public interface Shutdownable {
+ public void shutdown();
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
new file mode 100755
index 0000000..624db3e
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicReference;
+
+import backtype.storm.generated.DRPCRequest;
+import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.security.auth.ThriftClient;
+import backtype.storm.security.auth.ThriftConnectionType;
+import org.apache.thrift.transport.TTransportException;
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DRPCInvocationsClient extends ThriftClient implements DistributedRPCInvocations.Iface {
+ public static Logger LOG = LoggerFactory.getLogger(DRPCInvocationsClient.class);
+ private final AtomicReference<DistributedRPCInvocations.Client> client =
+ new AtomicReference<DistributedRPCInvocations.Client>();
+ private String host;
+ private int port;
+
+ public DRPCInvocationsClient(Map conf, String host, int port) throws TTransportException {
+ super(conf, ThriftConnectionType.DRPC_INVOCATIONS, host, port, null);
+ this.host = host;
+ this.port = port;
+ client.set(new DistributedRPCInvocations.Client(_protocol));
+ }
+
+ public String getHost() {
+ return host;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void reconnectClient() throws TException {
+ if (client.get() == null) {
+ reconnect();
+ client.set(new DistributedRPCInvocations.Client(_protocol));
+ }
+ }
+
+ public boolean isConnected() {
+ return client.get() != null;
+ }
+
+ public void result(String id, String result) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.result(id, result);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
+
+ public DRPCRequest fetchRequest(String func) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ return c.fetchRequest(func);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
+
+ public void failRequest(String id) throws TException, AuthorizationException {
+ DistributedRPCInvocations.Client c = client.get();
+ try {
+ if (c == null) {
+ throw new TException("Client is not connected...");
+ }
+ c.failRequest(id);
+ } catch(AuthorizationException aze) {
+ throw aze;
+ } catch(TException e) {
+ client.compareAndSet(c, null);
+ throw e;
+ }
+ }
+
+ public DistributedRPCInvocations.Client getClient() {
+ return client.get();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
new file mode 100644
index 0000000..4ed24d4
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/DRPCSpout.java
@@ -0,0 +1,263 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.thrift.TException;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.alibaba.jstorm.utils.NetWorkUtils;
+
+import backtype.storm.Config;
+import backtype.storm.ILocalDRPC;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.DRPCRequest;
+import backtype.storm.generated.DistributedRPCInvocations;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.ExtendedThreadPoolExecutor;
+import backtype.storm.utils.ServiceRegistry;
+import backtype.storm.utils.Utils;
+
+public class DRPCSpout extends BaseRichSpout {
+ //ANY CHANGE TO THIS CODE MUST BE SERIALIZABLE COMPATIBLE OR THERE WILL BE PROBLEMS
+ static final long serialVersionUID = 2387848310969237877L;
+
+ public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
+
+ SpoutOutputCollector _collector;
+ List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
+ transient LinkedList<Future<Void>> _futures = null;
+ transient ExecutorService _backround = null;
+ String _function;
+ String _local_drpc_id = null;
+
+ private static class DRPCMessageId {
+ String id;
+ int index;
+
+ public DRPCMessageId(String id, int index) {
+ this.id = id;
+ this.index = index;
+ }
+ }
+
+
+ public DRPCSpout(String function) {
+ _function = function;
+ }
+
+ public DRPCSpout(String function, ILocalDRPC drpc) {
+ _function = function;
+ _local_drpc_id = drpc.getServiceId();
+ }
+
+ private class Adder implements Callable<Void> {
+ private String server;
+ private int port;
+ private Map conf;
+
+ public Adder(String server, int port, Map conf) {
+ this.server = server;
+ this.port = port;
+ this.conf = conf;
+ }
+
+ @Override
+ public Void call() throws Exception {
+ DRPCInvocationsClient c = new DRPCInvocationsClient(conf, server, port);
+ synchronized (_clients) {
+ _clients.add(c);
+ }
+ return null;
+ }
+ }
+
+ private void reconnect(final DRPCInvocationsClient c) {
+ _futures.add(_backround.submit(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ c.reconnectClient();
+ return null;
+ }
+ }));
+ }
+
+ private void checkFutures() {
+ Iterator<Future<Void>> i = _futures.iterator();
+ while (i.hasNext()) {
+ Future<Void> f = i.next();
+ if (f.isDone()) {
+ i.remove();
+ }
+ try {
+ f.get();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ _collector = collector;
+ if(_local_drpc_id==null) {
+ _backround = new ExtendedThreadPoolExecutor(0, Integer.MAX_VALUE,
+ 60L, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>());
+ _futures = new LinkedList<Future<Void>>();
+
+ int numTasks = context.getComponentTasks(context.getThisComponentId()).size();
+ int index = context.getThisTaskIndex();
+
+ int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
+ List<String> servers = NetWorkUtils.host2Ip((List<String>) conf.get(Config.DRPC_SERVERS));
+
+ if(servers == null || servers.isEmpty()) {
+ throw new RuntimeException("No DRPC servers configured for topology");
+ }
+
+ if (numTasks < servers.size()) {
+ for (String s: servers) {
+ _futures.add(_backround.submit(new Adder(s, port, conf)));
+ }
+ } else {
+ int i = index % servers.size();
+ _futures.add(_backround.submit(new Adder(servers.get(i), port, conf)));
+ }
+ }
+
+ }
+
+ @Override
+ public void close() {
+ for(DRPCInvocationsClient client: _clients) {
+ client.close();
+ }
+ }
+
+ @Override
+ public void nextTuple() {
+ boolean gotRequest = false;
+ if(_local_drpc_id==null) {
+ int size = 0;
+ synchronized (_clients) {
+ size = _clients.size(); //This will only ever grow, so no need to worry about falling off the end
+ }
+ for(int i=0; i<size; i++) {
+ DRPCInvocationsClient client;
+ synchronized (_clients) {
+ client = _clients.get(i);
+ }
+ if (!client.isConnected()) {
+ continue;
+ }
+ try {
+ DRPCRequest req = client.fetchRequest(_function);
+ if(req.get_request_id().length() > 0) {
+ Map returnInfo = new HashMap();
+ returnInfo.put("id", req.get_request_id());
+ returnInfo.put("host", client.getHost());
+ returnInfo.put("port", client.getPort());
+ gotRequest = true;
+ _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), i));
+ break;
+ }
+ } catch (AuthorizationException aze) {
+ reconnect(client);
+ LOG.error("Not authorized to fetch DRPC result from DRPC server", aze);
+ } catch (TException e) {
+ reconnect(client);
+ LOG.error("Failed to fetch DRPC result from DRPC server", e);
+ } catch (Exception e) {
+ LOG.error("Failed to fetch DRPC result from DRPC server", e);
+ }
+ }
+ checkFutures();
+ } else {
+ DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+ if(drpc!=null) { // can happen during shutdown of drpc while topology is still up
+ try {
+ DRPCRequest req = drpc.fetchRequest(_function);
+ if(req.get_request_id().length() > 0) {
+ Map returnInfo = new HashMap();
+ returnInfo.put("id", req.get_request_id());
+ returnInfo.put("host", _local_drpc_id);
+ returnInfo.put("port", 0);
+ gotRequest = true;
+ _collector.emit(new Values(req.get_func_args(), JSONValue.toJSONString(returnInfo)), new DRPCMessageId(req.get_request_id(), 0));
+ }
+ } catch (AuthorizationException aze) {
+ throw new RuntimeException(aze);
+ } catch (TException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+ if(!gotRequest) {
+ Utils.sleep(1);
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ DRPCMessageId did = (DRPCMessageId) msgId;
+ DistributedRPCInvocations.Iface client;
+
+ if(_local_drpc_id == null) {
+ client = _clients.get(did.index);
+ } else {
+ client = (DistributedRPCInvocations.Iface) ServiceRegistry.getService(_local_drpc_id);
+ }
+ try {
+ client.failRequest(did.id);
+ } catch (AuthorizationException aze) {
+ LOG.error("Not authorized to failREquest from DRPC server", aze);
+ } catch (TException e) {
+ LOG.error("Failed to fail request", e);
+ }
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("args", "return-info"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
new file mode 100755
index 0000000..b74b97e
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/JoinResult.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class JoinResult extends BaseRichBolt {
+ public static Logger LOG = LoggerFactory.getLogger(JoinResult.class);
+
+ String returnComponent;
+ Map<Object, Tuple> returns = new HashMap<Object, Tuple>();
+ Map<Object, Tuple> results = new HashMap<Object, Tuple>();
+ OutputCollector _collector;
+
+ public JoinResult(String returnComponent) {
+ this.returnComponent = returnComponent;
+ }
+
+ public void prepare(Map map, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ }
+
+ public void execute(Tuple tuple) {
+ Object requestId = tuple.getValue(0);
+ if(tuple.getSourceComponent().equals(returnComponent)) {
+ returns.put(requestId, tuple);
+ } else {
+ results.put(requestId, tuple);
+ }
+
+ if(returns.containsKey(requestId) && results.containsKey(requestId)) {
+ Tuple result = results.remove(requestId);
+ Tuple returner = returns.remove(requestId);
+ LOG.debug(result.getValue(1).toString());
+ List<Tuple> anchors = new ArrayList<Tuple>();
+ anchors.add(result);
+ anchors.add(returner);
+ _collector.emit(anchors, new Values(""+result.getValue(1), returner.getValue(1)));
+ _collector.ack(result);
+ _collector.ack(returner);
+ }
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("result", "return-info"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
new file mode 100755
index 0000000..113163d
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.BasicBoltExecutor;
+import backtype.storm.topology.IBasicBolt;
+import backtype.storm.topology.IRichBolt;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.KeyedRoundRobinQueue;
+import java.util.HashMap;
+import java.util.Map;
+
+
+public class KeyedFairBolt implements IRichBolt, FinishedCallback {
+ IRichBolt _delegate;
+ KeyedRoundRobinQueue<Tuple> _rrQueue;
+ Thread _executor;
+ FinishedCallback _callback;
+
+ public KeyedFairBolt(IRichBolt delegate) {
+ _delegate = delegate;
+ }
+
+ public KeyedFairBolt(IBasicBolt delegate) {
+ this(new BasicBoltExecutor(delegate));
+ }
+
+
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ if(_delegate instanceof FinishedCallback) {
+ _callback = (FinishedCallback) _delegate;
+ }
+ _delegate.prepare(stormConf, context, collector);
+ _rrQueue = new KeyedRoundRobinQueue<Tuple>();
+ _executor = new Thread(new Runnable() {
+ public void run() {
+ try {
+ while(true) {
+ _delegate.execute(_rrQueue.take());
+ }
+ } catch (InterruptedException e) {
+
+ }
+ }
+ });
+ _executor.setDaemon(true);
+ _executor.start();
+ }
+
+ public void execute(Tuple input) {
+ Object key = input.getValue(0);
+ _rrQueue.add(key, input);
+ }
+
+ public void cleanup() {
+ _executor.interrupt();
+ _delegate.cleanup();
+ }
+
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ _delegate.declareOutputFields(declarer);
+ }
+
+ public void finishedId(Object id) {
+ if(_callback!=null) {
+ _callback.finishedId(id);
+ }
+ }
+
+ @Override
+ public Map<String, Object> getComponentConfiguration() {
+ return new HashMap<String, Object>();
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
new file mode 100755
index 0000000..d03075e
--- /dev/null
+++ b/jstorm-core/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.drpc;
+
+import backtype.storm.grouping.CustomStreamGrouping;
+import backtype.storm.topology.ComponentConfigurationDeclarer;
+import backtype.storm.tuple.Fields;
+
+public interface LinearDRPCInputDeclarer extends ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
+ public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
+ public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
+
+ public LinearDRPCInputDeclarer globalGrouping();
+ public LinearDRPCInputDeclarer globalGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer shuffleGrouping();
+ public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer localOrShuffleGrouping();
+ public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer noneGrouping();
+ public LinearDRPCInputDeclarer noneGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer allGrouping();
+ public LinearDRPCInputDeclarer allGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer directGrouping();
+ public LinearDRPCInputDeclarer directGrouping(String streamId);
+
+ public LinearDRPCInputDeclarer partialKeyGrouping(Fields fields);
+ public LinearDRPCInputDeclarer partialKeyGrouping(String streamId, Fields fields);
+
+ public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
+ public LinearDRPCInputDeclarer customGrouping(String streamId, CustomStreamGrouping grouping);
+
+}