You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/11/05 21:41:23 UTC
[44/60] [abbrv] [partial] storm git commit: Release 2.0.4-SNAPSHOT
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java b/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java
deleted file mode 100644
index 25a9b4c..0000000
--- a/jstorm-client/src/main/java/backtype/storm/StormSubmitter.java
+++ /dev/null
@@ -1,375 +0,0 @@
-package backtype.storm;
-
-import java.io.File;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.generated.SubmitOptions;
-import backtype.storm.generated.TopologyAssignException;
-import backtype.storm.generated.TopologySummary;
-import backtype.storm.utils.BufferFileInputStream;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Use this class to submit topologies to run on the Storm cluster. You should
- * run your program with the "storm jar" command from the command-line, and then
- * use this class to submit your topologies.
- */
-public class StormSubmitter {
- public static Logger LOG = LoggerFactory.getLogger(StormSubmitter.class);
-
- private static Nimbus.Iface localNimbus = null;
-
- public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
- StormSubmitter.localNimbus = localNimbusHandler;
- }
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or
- * until explicitly killed.
- *
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- */
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology) throws AlreadyAliveException,
- InvalidTopologyException {
- submitTopology(name, stormConf, topology, null);
- }
-
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology, SubmitOptions opts, List<File> jarFiles)
- throws AlreadyAliveException, InvalidTopologyException {
- if (jarFiles == null) {
- jarFiles = new ArrayList<File>();
- }
- Map<String, String> jars = new HashMap<String, String>(jarFiles.size());
- List<String> names = new ArrayList<String>(jarFiles.size());
-
- for (File f : jarFiles) {
- if (!f.exists()) {
- LOG.info(f.getName() + " is not existed: "
- + f.getAbsolutePath());
- continue;
- }
- jars.put(f.getName(), f.getAbsolutePath());
- names.add(f.getName());
- }
- LOG.info("Files: " + names + " will be loaded");
- stormConf.put(GenericOptionsParser.TOPOLOGY_LIB_PATH, jars);
- stormConf.put(GenericOptionsParser.TOPOLOGY_LIB_NAME, names);
- submitTopology(name, stormConf, topology, opts);
- }
-
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology, SubmitOptions opts,
- ProgressListener listener) throws AlreadyAliveException,
- InvalidTopologyException {
- submitTopology(name, stormConf, topology, opts);
- }
-
- /**
- * Submits a topology to run on the cluster. A topology runs forever or
- * until explicitly killed.
- *
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @param options
- * to manipulate the starting of the topology
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- */
- public static void submitTopology(String name, Map stormConf,
- StormTopology topology, SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
- if (!Utils.isValidConf(stormConf)) {
- throw new IllegalArgumentException(
- "Storm conf is not valid. Must be json-serializable");
- }
- stormConf = new HashMap(stormConf);
- stormConf.putAll(Utils.readCommandLineOpts());
- Map conf = Utils.readStormConfig();
- conf.putAll(stormConf);
- putUserInfo(conf, stormConf);
- try {
- String serConf = Utils.to_json(stormConf);
- if (localNimbus != null) {
- LOG.info("Submitting topology " + name + " in local mode");
- localNimbus.submitTopology(name, null, serConf, topology);
- } else {
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
- if (topologyNameExists(conf, name)) {
- throw new RuntimeException("Topology with name `" + name
- + "` already exists on cluster");
- }
-
- submitJar(conf);
- try {
- LOG.info("Submitting topology " + name
- + " in distributed mode with conf " + serConf);
- if (opts != null) {
- client.getClient().submitTopologyWithOpts(name, path,
- serConf, topology, opts);
- } else {
- // this is for backwards compatibility
- client.getClient().submitTopology(name, path, serConf,
- topology);
- }
- } finally {
- client.close();
- }
- }
- LOG.info("Finished submitting topology: " + name);
- } catch (InvalidTopologyException e) {
- LOG.warn("Topology submission exception", e);
- throw e;
- } catch (AlreadyAliveException e) {
- LOG.warn("Topology already alive exception", e);
- throw e;
- } catch (TopologyAssignException e) {
- LOG.warn("Failed to assign " + e.get_msg(), e);
- throw new RuntimeException(e);
- } catch (TException e) {
- LOG.warn("Failed to assign ", e);
- throw new RuntimeException(e);
- }
- }
-
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology
- * runs forever or until explicitly killed.
- *
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- * @throws TopologyAssignException
- */
-
- public static void submitTopologyWithProgressBar(String name,
- Map stormConf, StormTopology topology)
- throws AlreadyAliveException, InvalidTopologyException {
- submitTopologyWithProgressBar(name, stormConf, topology, null);
- }
-
- /**
- * Submits a topology to run on the cluster with a progress bar. A topology
- * runs forever or until explicitly killed.
- *
- *
- * @param name
- * the name of the storm.
- * @param stormConf
- * the topology-specific configuration. See {@link Config}.
- * @param topology
- * the processing to execute.
- * @param opts
- * to manipulate the starting of the topology
- * @throws AlreadyAliveException
- * if a topology with this name is already running
- * @throws InvalidTopologyException
- * if an invalid topology was submitted
- * @throws TopologyAssignException
- */
-
- public static void submitTopologyWithProgressBar(String name,
- Map stormConf, StormTopology topology, SubmitOptions opts)
- throws AlreadyAliveException, InvalidTopologyException {
-
- /**
- * remove progress bar in jstorm
- */
- submitTopology(name, stormConf, topology, opts);
- }
-
- private static boolean topologyNameExists(Map conf, String name) {
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
- try {
- ClusterSummary summary = client.getClient().getClusterInfo();
- for (TopologySummary s : summary.get_topologies()) {
- if (s.get_name().equals(name)) {
- return true;
- }
- }
- return false;
-
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- client.close();
- }
- }
-
- private static String submittedJar = null;
- private static String path = null;
-
- private static void submitJar(Map conf) {
- if (submittedJar == null) {
- NimbusClient client = NimbusClient.getConfiguredClient(conf);
- try {
- LOG.info("Jar not uploaded to master yet. Submitting jar...");
- String localJar = System.getProperty("storm.jar");
- path = client.getClient().beginFileUpload();
- String[] pathCache = path.split("/");
- String uploadLocation = path + "/stormjar-"
- + pathCache[pathCache.length - 1] + ".jar";
- List<String> lib = (List<String>) conf
- .get(GenericOptionsParser.TOPOLOGY_LIB_NAME);
- Map<String, String> libPath = (Map<String, String>) conf
- .get(GenericOptionsParser.TOPOLOGY_LIB_PATH);
- if (lib != null && lib.size() != 0) {
- for (String libName : lib) {
- String jarPath = path + "/lib/" + libName;
- client.getClient().beginLibUpload(jarPath);
- submitJar(conf, libPath.get(libName), jarPath, client);
- }
-
- } else {
- if (localJar == null) {
- // no lib, no client jar
- throw new RuntimeException("No client app jar, please upload it");
- }
- }
-
- if (localJar != null) {
- submittedJar = submitJar(conf, localJar,
- uploadLocation, client);
- }else {
- // no client jar, but with lib jar
- client.getClient().finishFileUpload(uploadLocation);
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
- client.close();
- }
- } else {
- LOG.info("Jar already uploaded to master. Not submitting jar.");
- }
- }
-
- public static String submitJar(Map conf, String localJar,
- String uploadLocation, NimbusClient client) {
- if (localJar == null) {
- throw new RuntimeException(
- "Must submit topologies using the 'storm' client script so that StormSubmitter knows which jar to upload.");
- }
-
- try {
-
- LOG.info("Uploading topology jar " + localJar
- + " to assigned location: " + uploadLocation);
- int bufferSize = 512 * 1024;
- Object maxBufSizeObject = conf
- .get(Config.NIMBUS_THRIFT_MAX_BUFFER_SIZE);
- if (maxBufSizeObject != null) {
- bufferSize = Utils.getInt(maxBufSizeObject) / 2;
- }
-
- BufferFileInputStream is = new BufferFileInputStream(localJar,
- bufferSize);
- while (true) {
- byte[] toSubmit = is.read();
- if (toSubmit.length == 0)
- break;
- client.getClient().uploadChunk(uploadLocation,
- ByteBuffer.wrap(toSubmit));
- }
- client.getClient().finishFileUpload(uploadLocation);
- LOG.info("Successfully uploaded topology jar to assigned location: "
- + uploadLocation);
- return uploadLocation;
- } catch (Exception e) {
- throw new RuntimeException(e);
- } finally {
-
- }
- }
-
- private static void putUserInfo(Map conf, Map stormConf) {
- stormConf.put("user.group", conf.get("user.group"));
- stormConf.put("user.name", conf.get("user.name"));
- stormConf.put("user.password", conf.get("user.password"));
- }
-
- /**
- * Interface use to track progress of file upload
- */
- public interface ProgressListener {
- /**
- * called before file is uploaded
- *
- * @param srcFile
- * - jar file to be uploaded
- * @param targetFile
- * - destination file
- * @param totalBytes
- * - total number of bytes of the file
- */
- public void onStart(String srcFile, String targetFile, long totalBytes);
-
- /**
- * called whenever a chunk of bytes is uploaded
- *
- * @param srcFile
- * - jar file to be uploaded
- * @param targetFile
- * - destination file
- * @param bytesUploaded
- * - number of bytes transferred so far
- * @param totalBytes
- * - total number of bytes of the file
- */
- public void onProgress(String srcFile, String targetFile,
- long bytesUploaded, long totalBytes);
-
- /**
- * called when the file is uploaded
- *
- * @param srcFile
- * - jar file to be uploaded
- * @param targetFile
- * - destination file
- * @param totalBytes
- * - total number of bytes of the file
- */
- public void onCompleted(String srcFile, String targetFile,
- long totalBytes);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/Tool.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/Tool.java b/jstorm-client/src/main/java/backtype/storm/Tool.java
deleted file mode 100644
index 5fa7ccc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/Tool.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package backtype.storm;
-
-/**
- * A tool abstract class that supports handling of generic
- * command-line options.
- *
- * <p>Here is how a typical <code>Tool</code> is implemented:</p>
- * <p><blockquote><pre>
- * public class TopologyApp extends Tool {
- * {@literal @}Override
- * public int run(String[] args) throws Exception {
- * // Config processed by ToolRunner
- * Config conf = getConf();
- *
- * // Other setups go here
- * String name = "topology";
- * StormTopology topology = buildTopology(args);
- * StormSubmitter.submitTopology(name, conf, topology);
- * return 0;
- * }
- *
- * StormTopology buildTopology(String[] args) { ... }
- *
- * public static void main(String[] args) throws Exception {
- * // Use ToolRunner to handle generic command-line options
- * ToolRunner.run(new TopologyApp(), args);
- * }
- * }
- * </pre></blockquote></p>
- *
- * @see GenericOptionsParser
- * @see ToolRunner
- */
-
-public abstract class Tool {
- Config config;
-
- public abstract int run(String[] args) throws Exception;
-
- public Config getConf() {
- return config;
- }
-
- public void setConf(Config config) {
- this.config = config;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/ToolRunner.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/ToolRunner.java b/jstorm-client/src/main/java/backtype/storm/ToolRunner.java
deleted file mode 100644
index 30940da..0000000
--- a/jstorm-client/src/main/java/backtype/storm/ToolRunner.java
+++ /dev/null
@@ -1,53 +0,0 @@
-package backtype.storm;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.cli.ParseException;
-
-import backtype.storm.utils.Utils;
-
-/**
- * A utility to help run {@link Tool}s
- *
- * <p><code>ToolRunner</code> can be used to run classes extending the
- * <code>Tool</code> abstract class. It works in conjunction with
- * {@link GenericOptionsParser} to parse the <a
- * href="{@docRoot}/backtype/storm/GenericOptionsParser.html#GenericOptions">
- * generic storm command line arguments</a> and modifies the
- * <code>Config</code> of the <code>Tool</code>. The
- * application-specific options are passed along without being
- * modified.
- *
- * @see Tool
- * @see GenericOptionsParser
- */
-
-public class ToolRunner {
- static final Logger LOG = LoggerFactory.getLogger(ToolRunner.class);
-
- public static void run(Tool tool, String[] args) {
- run(tool.getConf(), tool, args);
- }
-
- public static void run(Config conf, Tool tool, String[] args) {
- try {
- if (conf == null) {
- conf = new Config();
- conf.putAll(Utils.readStormConfig());
- }
-
- GenericOptionsParser parser = new GenericOptionsParser(conf, args);
- tool.setConf(conf);
-
- System.exit(tool.run(parser.getRemainingArgs()));
- } catch (ParseException e) {
- LOG.error("Error parsing generic options: {}", e.getMessage());
- GenericOptionsParser.printGenericCommandUsage(System.err);
- System.exit(2);
- } catch (Exception e) {
- LOG.error("Error running tool", e);
- System.exit(1);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java b/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java
deleted file mode 100644
index bd3873f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureBolt.java
+++ /dev/null
@@ -1,108 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ClojureBolt implements IRichBolt, FinishedCallback {
- Map<String, StreamInfo> _fields;
- List<String> _fnSpec;
- List<String> _confSpec;
- List<Object> _params;
-
- IBolt _bolt;
-
- public ClojureBolt(List fnSpec, List confSpec, List<Object> params,
- Map<String, StreamInfo> fields) {
- _fnSpec = fnSpec;
- _confSpec = confSpec;
- _params = params;
- _fields = fields;
- }
-
- @Override
- public void prepare(final Map stormConf, final TopologyContext context,
- final OutputCollector collector) {
- IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
- try {
- IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
- final Map<Keyword, Object> collectorMap = new PersistentArrayMap(
- new Object[] {
- Keyword.intern(Symbol.create("output-collector")),
- collector,
- Keyword.intern(Symbol.create("context")), context });
- List<Object> args = new ArrayList<Object>() {
- {
- add(stormConf);
- add(context);
- add(collectorMap);
- }
- };
-
- _bolt = (IBolt) preparer.applyTo(RT.seq(args));
- // this is kind of unnecessary for clojure
- try {
- _bolt.prepare(stormConf, context, collector);
- } catch (AbstractMethodError ame) {
-
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void execute(Tuple input) {
- _bolt.execute(input);
- }
-
- @Override
- public void cleanup() {
- try {
- _bolt.cleanup();
- } catch (AbstractMethodError ame) {
-
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (String stream : _fields.keySet()) {
- StreamInfo info = _fields.get(stream);
- declarer.declareStream(stream, info.is_direct(),
- new Fields(info.get_output_fields()));
- }
- }
-
- @Override
- public void finishedId(Object id) {
- if (_bolt instanceof FinishedCallback) {
- ((FinishedCallback) _bolt).finishedId(id);
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
- try {
- return (Map) hof.applyTo(RT.seq(_params));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java b/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java
deleted file mode 100644
index 3606252..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/ClojureSpout.java
+++ /dev/null
@@ -1,142 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.spout.ISpout;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.utils.Utils;
-import clojure.lang.IFn;
-import clojure.lang.PersistentArrayMap;
-import clojure.lang.Keyword;
-import clojure.lang.Symbol;
-import clojure.lang.RT;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-public class ClojureSpout implements IRichSpout {
- Map<String, StreamInfo> _fields;
- List<String> _fnSpec;
- List<String> _confSpec;
- List<Object> _params;
-
- ISpout _spout;
-
- public ClojureSpout(List fnSpec, List confSpec, List<Object> params,
- Map<String, StreamInfo> fields) {
- _fnSpec = fnSpec;
- _confSpec = confSpec;
- _params = params;
- _fields = fields;
- }
-
- @Override
- public void open(final Map conf, final TopologyContext context,
- final SpoutOutputCollector collector) {
- IFn hof = Utils.loadClojureFn(_fnSpec.get(0), _fnSpec.get(1));
- try {
- IFn preparer = (IFn) hof.applyTo(RT.seq(_params));
- final Map<Keyword, Object> collectorMap = new PersistentArrayMap(
- new Object[] {
- Keyword.intern(Symbol.create("output-collector")),
- collector,
- Keyword.intern(Symbol.create("context")), context });
- List<Object> args = new ArrayList<Object>() {
- {
- add(conf);
- add(context);
- add(collectorMap);
- }
- };
-
- _spout = (ISpout) preparer.applyTo(RT.seq(args));
- // this is kind of unnecessary for clojure
- try {
- _spout.open(conf, context, collector);
- } catch (AbstractMethodError ame) {
-
- }
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void close() {
- try {
- _spout.close();
- } catch (AbstractMethodError ame) {
-
- }
- }
-
- @Override
- public void nextTuple() {
- try {
- _spout.nextTuple();
- } catch (AbstractMethodError ame) {
-
- }
-
- }
-
- @Override
- public void ack(Object msgId) {
- try {
- _spout.ack(msgId);
- } catch (AbstractMethodError ame) {
-
- }
-
- }
-
- @Override
- public void fail(Object msgId) {
- try {
- _spout.fail(msgId);
- } catch (AbstractMethodError ame) {
-
- }
-
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (String stream : _fields.keySet()) {
- StreamInfo info = _fields.get(stream);
- declarer.declareStream(stream, info.is_direct(),
- new Fields(info.get_output_fields()));
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- IFn hof = Utils.loadClojureFn(_confSpec.get(0), _confSpec.get(1));
- try {
- return (Map) hof.applyTo(RT.seq(_params));
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void activate() {
- try {
- _spout.activate();
- } catch (AbstractMethodError ame) {
-
- }
- }
-
- @Override
- public void deactivate() {
- try {
- _spout.deactivate();
- } catch (AbstractMethodError ame) {
-
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java b/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java
deleted file mode 100644
index 9289448..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellBolt.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.task.ShellBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellBolt extends ShellBolt implements IRichBolt {
- private Map<String, StreamInfo> _outputs;
-
- public RichShellBolt(String[] command, Map<String, StreamInfo> outputs) {
- super(command);
- _outputs = outputs;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (String stream : _outputs.keySet()) {
- StreamInfo def = _outputs.get(stream);
- if (def.is_direct()) {
- declarer.declareStream(stream, true,
- new Fields(def.get_output_fields()));
- } else {
- declarer.declareStream(stream,
- new Fields(def.get_output_fields()));
- }
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java b/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java
deleted file mode 100644
index 98763a5..0000000
--- a/jstorm-client/src/main/java/backtype/storm/clojure/RichShellSpout.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.clojure;
-
-import backtype.storm.generated.StreamInfo;
-import backtype.storm.spout.ShellSpout;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import java.util.Map;
-
-public class RichShellSpout extends ShellSpout implements IRichSpout {
- private Map<String, StreamInfo> _outputs;
-
- public RichShellSpout(String[] command, Map<String, StreamInfo> outputs) {
- super(command);
- _outputs = outputs;
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- for (String stream : _outputs.keySet()) {
- StreamInfo def = _outputs.get(stream);
- if (def.is_direct()) {
- declarer.declareStream(stream, true,
- new Fields(def.get_output_fields()));
- } else {
- declarer.declareStream(stream,
- new Fields(def.get_output_fields()));
- }
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/activate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/activate.java b/jstorm-client/src/main/java/backtype/storm/command/activate.java
deleted file mode 100644
index ed8c33a..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/activate.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Activate topology
- *
- * @author longda
- *
- */
-public class activate {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length == 0) {
- throw new InvalidParameterException("Should input topology name");
- }
-
- String topologyName = args[0];
-
- NimbusClient client = null;
- try {
-
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- client.getClient().activate(topologyName);
-
- System.out.println("Successfully submit command activate "
- + topologyName);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/config_value.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/config_value.java b/jstorm-client/src/main/java/backtype/storm/command/config_value.java
deleted file mode 100644
index 90a3ed0..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/config_value.java
+++ /dev/null
@@ -1,33 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Get configuration
- *
- * @author longda
- *
- */
-public class config_value {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length == 0) {
- throw new InvalidParameterException("Should input key name");
- }
-
- String key = args[0];
-
- Map conf = Utils.readStormConfig();
-
- System.out.print("VALUE: " + String.valueOf(conf.get(key)));
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/deactivate.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/deactivate.java b/jstorm-client/src/main/java/backtype/storm/command/deactivate.java
deleted file mode 100644
index 845f456..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/deactivate.java
+++ /dev/null
@@ -1,49 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Deactivate topology
- *
- * @author longda
- *
- */
-public class deactivate {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length == 0) {
- throw new InvalidParameterException("Should input topology name");
- }
-
- String topologyName = args[0];
-
- NimbusClient client = null;
- try {
-
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- client.getClient().deactivate(topologyName);
-
- System.out.println("Successfully submit command deactivate "
- + topologyName);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java b/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java
deleted file mode 100644
index 0c950cb..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/kill_topology.java
+++ /dev/null
@@ -1,61 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.generated.KillOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Kill topology
- *
- * @author longda
- *
- */
-public class kill_topology {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length == 0) {
- throw new InvalidParameterException("Should input topology name");
- }
-
- String topologyName = args[0];
-
- NimbusClient client = null;
- try {
-
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- if (args.length == 1) {
-
- client.getClient().killTopology(topologyName);
- } else {
- int delaySeconds = Integer.parseInt(args[1]);
-
- KillOptions options = new KillOptions();
- options.set_wait_secs(delaySeconds);
-
- client.getClient().killTopologyWithOpts(topologyName, options);
-
- }
-
- System.out.println("Successfully submit command kill "
- + topologyName);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/list.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/list.java b/jstorm-client/src/main/java/backtype/storm/command/list.java
deleted file mode 100644
index b0e40bc..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/list.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.command;
-
-import java.util.Map;
-
-import org.apache.commons.lang.StringUtils;
-
-import backtype.storm.generated.ClusterSummary;
-import backtype.storm.generated.TopologyInfo;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Activate topology
- *
- * @author longda
- *
- */
-public class list {
-
-
-
-
- /**
- * @param args
- */
- public static void main(String[] args) {
-
- NimbusClient client = null;
- try {
-
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- if (args.length > 0 && StringUtils.isBlank(args[0]) == false) {
- String topologyName = args[0];
- TopologyInfo info = client.getClient().getTopologyInfoByName(topologyName);
-
- System.out.println("Successfully get topology info \n"
- + Utils.toPrettyJsonString(info));
- }else {
- ClusterSummary clusterSummary = client.getClient().getClusterInfo();
-
- System.out.println("Successfully get cluster info \n"
- + Utils.toPrettyJsonString(clusterSummary));
- }
-
-
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java b/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java
deleted file mode 100644
index bb339d4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/metrics_monitor.java
+++ /dev/null
@@ -1,56 +0,0 @@
-package backtype.storm.command;
-
-import java.util.Map;
-import java.security.InvalidParameterException;
-
-import backtype.storm.generated.MonitorOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Monitor topology
- *
- * @author Basti
- *
- */
-public class metrics_monitor {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length <= 1) {
- throw new InvalidParameterException("Should input topology name and enable flag");
- }
-
- String topologyName = args[0];
-
- NimbusClient client = null;
- try {
-
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- boolean isEnable = Boolean.valueOf(args[1]).booleanValue();
-
- MonitorOptions options = new MonitorOptions();
- options.set_isEnable(isEnable);
-
- client.getClient().metricMonitor(topologyName, options);
-
- String str = (isEnable) ? "enable" : "disable";
- System.out.println("Successfully submit command to " + str
- + " the monitor of " + topologyName);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/rebalance.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/rebalance.java b/jstorm-client/src/main/java/backtype/storm/command/rebalance.java
deleted file mode 100644
index 72c3f26..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/rebalance.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package backtype.storm.command;
-
-import java.security.InvalidParameterException;
-import java.util.Map;
-
-import backtype.storm.generated.RebalanceOptions;
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Active topology
- *
- * @author longda
- *
- */
-public class rebalance {
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length == 0) {
- throw new InvalidParameterException("Should input topology name");
- }
-
- String topologyName = args[0];
-
- NimbusClient client = null;
- try {
-
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- if (args.length == 1) {
-
- client.getClient().rebalance(topologyName, null);
- } else {
- int delaySeconds = Integer.parseInt(args[1]);
-
- RebalanceOptions options = new RebalanceOptions();
- options.set_wait_secs(delaySeconds);
-
- client.getClient().rebalance(topologyName, options);
- }
-
- System.out.println("Successfully submit command rebalance "
- + topologyName);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/command/restart.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/command/restart.java b/jstorm-client/src/main/java/backtype/storm/command/restart.java
deleted file mode 100644
index 1eb577d..0000000
--- a/jstorm-client/src/main/java/backtype/storm/command/restart.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package backtype.storm.command;
-
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.InputStream;
-import java.security.InvalidParameterException;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
-
-import org.yaml.snakeyaml.Yaml;
-
-import backtype.storm.utils.NimbusClient;
-import backtype.storm.utils.Utils;
-
-/**
- * Active topology
- *
- * @author basti
- *
- */
-public class restart {
- private static Map LoadProperty(String prop) {
- Map ret = new HashMap<Object, Object>();
- Properties properties = new Properties();
-
- try {
- InputStream stream = new FileInputStream(prop);
- properties.load(stream);
- if (properties.size() == 0) {
- System.out.println("WARN: Config file is empty");
- return null;
- } else {
- ret.putAll(properties);
- }
- } catch (FileNotFoundException e) {
- System.out.println("No such file " + prop);
- throw new RuntimeException(e.getMessage());
- } catch (Exception e1) {
- e1.printStackTrace();
- throw new RuntimeException(e1.getMessage());
- }
-
- return ret;
- }
-
- private static Map LoadYaml(String confPath) {
- Map ret = new HashMap<Object, Object>();
- Yaml yaml = new Yaml();
-
- try {
- InputStream stream = new FileInputStream(confPath);
- ret = (Map) yaml.load(stream);
- if (ret == null || ret.isEmpty() == true) {
- System.out.println("WARN: Config file is empty");
- return null;
- }
- } catch (FileNotFoundException e) {
- System.out.println("No such file " + confPath);
- throw new RuntimeException("No config file");
- } catch (Exception e1) {
- e1.printStackTrace();
- throw new RuntimeException("Failed to read config file");
- }
-
- return ret;
- }
-
- private static Map LoadConf(String arg) {
- Map ret = null;
- if (arg.endsWith("yaml")) {
- ret = LoadYaml(arg);
- } else {
- ret = LoadProperty(arg);
- }
- return ret;
- }
-
- /**
- * @param args
- */
- public static void main(String[] args) {
- // TODO Auto-generated method stub
- if (args == null || args.length == 0) {
- throw new InvalidParameterException("Should input topology name");
- }
-
- String topologyName = args[0];
-
- NimbusClient client = null;
- try {
- Map conf = Utils.readStormConfig();
- client = NimbusClient.getConfiguredClient(conf);
-
- System.out.println("It will take 15 ~ 100 seconds to restart, please wait patiently\n");
-
- if (args.length == 1) {
- client.getClient().restart(topologyName, null);
- } else {
- Map loadConf = LoadConf(args[1]);
- String jsonConf = Utils.to_json(loadConf);
- System.out.println("New configuration:\n" + jsonConf);
-
- client.getClient().restart(topologyName, jsonConf);
- }
-
- System.out.println("Successfully submit command restart "
- + topologyName);
- } catch (Exception e) {
- System.out.println(e.getMessage());
- e.printStackTrace();
- throw new RuntimeException(e);
- } finally {
- if (client != null) {
- client.close();
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
deleted file mode 100644
index 5502975..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchBoltExecutor.java
+++ /dev/null
@@ -1,92 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.coordination.CoordinatedBolt.TimeoutCallback;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.FailedException;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.Utils;
-import java.util.HashMap;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class BatchBoltExecutor implements IRichBolt, FinishedCallback,
- TimeoutCallback {
- public static Logger LOG = LoggerFactory.getLogger(BatchBoltExecutor.class);
-
- byte[] _boltSer;
- Map<Object, IBatchBolt> _openTransactions;
- Map _conf;
- TopologyContext _context;
- BatchOutputCollectorImpl _collector;
-
- public BatchBoltExecutor(IBatchBolt bolt) {
- _boltSer = Utils.serialize(bolt);
- }
-
- @Override
- public void prepare(Map conf, TopologyContext context,
- OutputCollector collector) {
- _conf = conf;
- _context = context;
- _collector = new BatchOutputCollectorImpl(collector);
- _openTransactions = new HashMap<Object, IBatchBolt>();
- }
-
- @Override
- public void execute(Tuple input) {
- Object id = input.getValue(0);
- IBatchBolt bolt = getBatchBolt(id);
- try {
- bolt.execute(input);
- _collector.ack(input);
- } catch (FailedException e) {
- LOG.error("Failed to process tuple in batch", e);
- _collector.fail(input);
- }
- }
-
- @Override
- public void cleanup() {
- }
-
- @Override
- public void finishedId(Object id) {
- IBatchBolt bolt = getBatchBolt(id);
- _openTransactions.remove(id);
- bolt.finishBatch();
- }
-
- @Override
- public void timeoutId(Object attempt) {
- _openTransactions.remove(attempt);
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- newTransactionalBolt().declareOutputFields(declarer);
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return newTransactionalBolt().getComponentConfiguration();
- }
-
- private IBatchBolt getBatchBolt(Object id) {
- IBatchBolt bolt = _openTransactions.get(id);
- if (bolt == null) {
- bolt = newTransactionalBolt();
- bolt.prepare(_conf, _context, _collector, id);
- _openTransactions.put(id, bolt);
- }
- return bolt;
- }
-
- private IBatchBolt newTransactionalBolt() {
- return (IBatchBolt) Utils.deserialize(_boltSer);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
deleted file mode 100644
index e087f5f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.utils.Utils;
-import java.util.List;
-
-public abstract class BatchOutputCollector {
-
- /**
- * Emits a tuple to the default output stream.
- */
- public List<Integer> emit(List<Object> tuple) {
- return emit(Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- public abstract List<Integer> emit(String streamId, List<Object> tuple);
-
- /**
- * Emits a tuple to the specified task on the default output stream. This
- * output stream must have been declared as a direct stream, and the
- * specified task must use a direct grouping on this stream to receive the
- * message.
- */
- public void emitDirect(int taskId, List<Object> tuple) {
- emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple);
- }
-
- public abstract void emitDirect(int taskId, String streamId,
- List<Object> tuple);
-
- public abstract void reportError(Throwable error);
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
deleted file mode 100644
index 73eff73..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchOutputCollectorImpl.java
+++ /dev/null
@@ -1,36 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.tuple.Tuple;
-import java.util.List;
-
-public class BatchOutputCollectorImpl extends BatchOutputCollector {
- OutputCollector _collector;
-
- public BatchOutputCollectorImpl(OutputCollector collector) {
- _collector = collector;
- }
-
- @Override
- public List<Integer> emit(String streamId, List<Object> tuple) {
- return _collector.emit(streamId, tuple);
- }
-
- @Override
- public void emitDirect(int taskId, String streamId, List<Object> tuple) {
- _collector.emitDirect(taskId, streamId, tuple);
- }
-
- @Override
- public void reportError(Throwable error) {
- _collector.reportError(error);
- }
-
- public void ack(Tuple tup) {
- _collector.ack(tup);
- }
-
- public void fail(Tuple tup) {
- _collector.fail(tup);
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java b/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
deleted file mode 100644
index eff05b4..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/BatchSubtopologyBuilder.java
+++ /dev/null
@@ -1,469 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.Constants;
-import backtype.storm.coordination.CoordinatedBolt.SourceArgs;
-import backtype.storm.generated.GlobalStreamId;
-import backtype.storm.generated.Grouping;
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.BaseConfigurationDeclarer;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.BoltDeclarer;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.InputDeclarer;
-import backtype.storm.topology.TopologyBuilder;
-import backtype.storm.tuple.Fields;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-
-public class BatchSubtopologyBuilder {
- Map<String, Component> _bolts = new HashMap<String, Component>();
- Component _masterBolt;
- String _masterId;
-
- public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt,
- Number boltParallelism) {
- Integer p = boltParallelism == null ? null : boltParallelism.intValue();
- _masterBolt = new Component(new BasicBoltExecutor(masterBolt), p);
- _masterId = masterBoltId;
- }
-
- public BatchSubtopologyBuilder(String masterBoltId, IBasicBolt masterBolt) {
- this(masterBoltId, masterBolt, null);
- }
-
- public BoltDeclarer getMasterDeclarer() {
- return new BoltDeclarerImpl(_masterBolt);
- }
-
- public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- public BoltDeclarer setBolt(String id, IBatchBolt bolt, Number parallelism) {
- return setBolt(id, new BatchBoltExecutor(bolt), parallelism);
- }
-
- public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
- return setBolt(id, bolt, null);
- }
-
- public BoltDeclarer setBolt(String id, IBasicBolt bolt, Number parallelism) {
- return setBolt(id, new BasicBoltExecutor(bolt), parallelism);
- }
-
- private BoltDeclarer setBolt(String id, IRichBolt bolt, Number parallelism) {
- Integer p = null;
- if (parallelism != null)
- p = parallelism.intValue();
- Component component = new Component(bolt, p);
- _bolts.put(id, component);
- return new BoltDeclarerImpl(component);
- }
-
- public void extendTopology(TopologyBuilder builder) {
- BoltDeclarer declarer = builder.setBolt(_masterId, new CoordinatedBolt(
- _masterBolt.bolt), _masterBolt.parallelism);
- for (InputDeclaration decl : _masterBolt.declarations) {
- decl.declare(declarer);
- }
- for (Map conf : _masterBolt.componentConfs) {
- declarer.addConfigurations(conf);
- }
- for (String id : _bolts.keySet()) {
- Component component = _bolts.get(id);
- Map<String, SourceArgs> coordinatedArgs = new HashMap<String, SourceArgs>();
- for (String c : componentBoltSubscriptions(component)) {
- SourceArgs source;
- if (c.equals(_masterId)) {
- source = SourceArgs.single();
- } else {
- source = SourceArgs.all();
- }
- coordinatedArgs.put(c, source);
- }
-
- BoltDeclarer input = builder.setBolt(id, new CoordinatedBolt(
- component.bolt, coordinatedArgs, null),
- component.parallelism);
- for (Map conf : component.componentConfs) {
- input.addConfigurations(conf);
- }
- for (String c : componentBoltSubscriptions(component)) {
- input.directGrouping(c, Constants.COORDINATED_STREAM_ID);
- }
- for (InputDeclaration d : component.declarations) {
- d.declare(input);
- }
- }
- }
-
- private Set<String> componentBoltSubscriptions(Component component) {
- Set<String> ret = new HashSet<String>();
- for (InputDeclaration d : component.declarations) {
- ret.add(d.getComponent());
- }
- return ret;
- }
-
- private static class Component {
- public IRichBolt bolt;
- public Integer parallelism;
- public List<InputDeclaration> declarations = new ArrayList<InputDeclaration>();
- public List<Map> componentConfs = new ArrayList<Map>();
-
- public Component(IRichBolt bolt, Integer parallelism) {
- this.bolt = bolt;
- this.parallelism = parallelism;
- }
- }
-
- private static interface InputDeclaration {
- void declare(InputDeclarer declarer);
-
- String getComponent();
- }
-
- private class BoltDeclarerImpl extends
- BaseConfigurationDeclarer<BoltDeclarer> implements BoltDeclarer {
- Component _component;
-
- public BoltDeclarerImpl(Component component) {
- _component = component;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component,
- final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer fieldsGrouping(final String component,
- final String streamId, final Fields fields) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.fieldsGrouping(component, streamId, fields);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer globalGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.globalGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer shuffleGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.shuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localOrShuffleGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localOrShuffleGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(final String componentId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localFirstGrouping(componentId);
- }
-
- @Override
- public String getComponent() {
- return componentId;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer localFirstGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.localFirstGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer noneGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.noneGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer allGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.allGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer directGrouping(final String component,
- final String streamId) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.directGrouping(component, streamId);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component,
- final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer customGrouping(final String component,
- final String streamId, final CustomStreamGrouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.customGrouping(component, streamId, grouping);
- }
-
- @Override
- public String getComponent() {
- return component;
- }
- });
- return this;
- }
-
- @Override
- public BoltDeclarer grouping(final GlobalStreamId stream,
- final Grouping grouping) {
- addDeclaration(new InputDeclaration() {
- @Override
- public void declare(InputDeclarer declarer) {
- declarer.grouping(stream, grouping);
- }
-
- @Override
- public String getComponent() {
- return stream.get_componentId();
- }
- });
- return this;
- }
-
- private void addDeclaration(InputDeclaration declaration) {
- _component.declarations.add(declaration);
- }
-
- @Override
- public BoltDeclarer addConfigurations(Map conf) {
- _component.componentConfs.add(conf);
- return this;
- }
-
-
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java b/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
deleted file mode 100644
index 1e78932..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/CoordinatedBolt.java
+++ /dev/null
@@ -1,379 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.topology.FailedException;
-import java.util.Map.Entry;
-import backtype.storm.tuple.Values;
-import backtype.storm.generated.GlobalStreamId;
-import java.util.Collection;
-import backtype.storm.Constants;
-import backtype.storm.generated.Grouping;
-import backtype.storm.task.IOutputCollector;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.TimeCacheMap;
-import backtype.storm.utils.Utils;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import static backtype.storm.utils.Utils.get;
-
-/**
- * Coordination requires the request ids to be globally unique for awhile. This
- * is so it doesn't get confused in the case of retries.
- */
-public class CoordinatedBolt implements IRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(CoordinatedBolt.class);
-
- public static interface FinishedCallback {
- void finishedId(Object id);
- }
-
- public static interface TimeoutCallback {
- void timeoutId(Object id);
- }
-
- public static class SourceArgs implements Serializable {
- public boolean singleCount;
-
- protected SourceArgs(boolean singleCount) {
- this.singleCount = singleCount;
- }
-
- public static SourceArgs single() {
- return new SourceArgs(true);
- }
-
- public static SourceArgs all() {
- return new SourceArgs(false);
- }
-
- @Override
- public String toString() {
- return "<Single: " + singleCount + ">";
- }
- }
-
- public class CoordinatedOutputCollector implements IOutputCollector {
- IOutputCollector _delegate;
-
- public CoordinatedOutputCollector(IOutputCollector delegate) {
- _delegate = delegate;
- }
-
- public List<Integer> emit(String stream, Collection<Tuple> anchors,
- List<Object> tuple) {
- List<Integer> tasks = _delegate.emit(stream, anchors, tuple);
- updateTaskCounts(tuple.get(0), tasks);
- return tasks;
- }
-
- public void emitDirect(int task, String stream,
- Collection<Tuple> anchors, List<Object> tuple) {
- updateTaskCounts(tuple.get(0), Arrays.asList(task));
- _delegate.emitDirect(task, stream, anchors, tuple);
- }
-
- public void ack(Tuple tuple) {
- Object id = tuple.getValue(0);
- synchronized (_tracked) {
- TrackingInfo track = _tracked.get(id);
- if (track != null)
- track.receivedTuples++;
- }
- boolean failed = checkFinishId(tuple, TupleType.REGULAR);
- if (failed) {
- _delegate.fail(tuple);
- } else {
- _delegate.ack(tuple);
- }
- }
-
- public void fail(Tuple tuple) {
- Object id = tuple.getValue(0);
- synchronized (_tracked) {
- TrackingInfo track = _tracked.get(id);
- if (track != null)
- track.failed = true;
- }
- checkFinishId(tuple, TupleType.REGULAR);
- _delegate.fail(tuple);
- }
-
- public void reportError(Throwable error) {
- _delegate.reportError(error);
- }
-
- private void updateTaskCounts(Object id, List<Integer> tasks) {
- synchronized (_tracked) {
- TrackingInfo track = _tracked.get(id);
- if (track != null) {
- Map<Integer, Integer> taskEmittedTuples = track.taskEmittedTuples;
- for (Integer task : tasks) {
- int newCount = get(taskEmittedTuples, task, 0) + 1;
- taskEmittedTuples.put(task, newCount);
- }
- }
- }
- }
- }
-
- private Map<String, SourceArgs> _sourceArgs;
- private IdStreamSpec _idStreamSpec;
- private IRichBolt _delegate;
- private Integer _numSourceReports;
- private List<Integer> _countOutTasks = new ArrayList<Integer>();;
- private OutputCollector _collector;
- private TimeCacheMap<Object, TrackingInfo> _tracked;
-
- public static class TrackingInfo {
- int reportCount = 0;
- int expectedTupleCount = 0;
- int receivedTuples = 0;
- boolean failed = false;
- Map<Integer, Integer> taskEmittedTuples = new HashMap<Integer, Integer>();
- boolean receivedId = false;
- boolean finished = false;
- List<Tuple> ackTuples = new ArrayList<Tuple>();
-
- @Override
- public String toString() {
- return "reportCount: " + reportCount + "\n"
- + "expectedTupleCount: " + expectedTupleCount + "\n"
- + "receivedTuples: " + receivedTuples + "\n" + "failed: "
- + failed + "\n" + taskEmittedTuples.toString();
- }
- }
-
- public static class IdStreamSpec implements Serializable {
- GlobalStreamId _id;
-
- public GlobalStreamId getGlobalStreamId() {
- return _id;
- }
-
- public static IdStreamSpec makeDetectSpec(String component,
- String stream) {
- return new IdStreamSpec(component, stream);
- }
-
- protected IdStreamSpec(String component, String stream) {
- _id = new GlobalStreamId(component, stream);
- }
- }
-
- public CoordinatedBolt(IRichBolt delegate) {
- this(delegate, null, null);
- }
-
- public CoordinatedBolt(IRichBolt delegate, String sourceComponent,
- SourceArgs sourceArgs, IdStreamSpec idStreamSpec) {
- this(delegate, singleSourceArgs(sourceComponent, sourceArgs),
- idStreamSpec);
- }
-
- public CoordinatedBolt(IRichBolt delegate,
- Map<String, SourceArgs> sourceArgs, IdStreamSpec idStreamSpec) {
- _sourceArgs = sourceArgs;
- if (_sourceArgs == null)
- _sourceArgs = new HashMap<String, SourceArgs>();
- _delegate = delegate;
- _idStreamSpec = idStreamSpec;
- }
-
- public void prepare(Map config, TopologyContext context,
- OutputCollector collector) {
- TimeCacheMap.ExpiredCallback<Object, TrackingInfo> callback = null;
- if (_delegate instanceof TimeoutCallback) {
- callback = new TimeoutItems();
- }
- _tracked = new TimeCacheMap<Object, TrackingInfo>(
- context.maxTopologyMessageTimeout(), callback);
- _collector = collector;
- _delegate.prepare(config, context, new OutputCollector(
- new CoordinatedOutputCollector(collector)));
- for (String component : Utils.get(context.getThisTargets(),
- Constants.COORDINATED_STREAM_ID,
- new HashMap<String, Grouping>()).keySet()) {
- for (Integer task : context.getComponentTasks(component)) {
- _countOutTasks.add(task);
- }
- }
- if (!_sourceArgs.isEmpty()) {
- _numSourceReports = 0;
- for (Entry<String, SourceArgs> entry : _sourceArgs.entrySet()) {
- if (entry.getValue().singleCount) {
- _numSourceReports += 1;
- } else {
- _numSourceReports += context.getComponentTasks(
- entry.getKey()).size();
- }
- }
- }
- }
-
- private boolean checkFinishId(Tuple tup, TupleType type) {
- Object id = tup.getValue(0);
- boolean failed = false;
-
- synchronized (_tracked) {
- TrackingInfo track = _tracked.get(id);
- try {
- if (track != null) {
- boolean delayed = false;
- if (_idStreamSpec == null && type == TupleType.COORD
- || _idStreamSpec != null && type == TupleType.ID) {
- track.ackTuples.add(tup);
- delayed = true;
- }
- if (track.failed) {
- failed = true;
- for (Tuple t : track.ackTuples) {
- _collector.fail(t);
- }
- _tracked.remove(id);
- } else if (track.receivedId
- && (_sourceArgs.isEmpty() || track.reportCount == _numSourceReports
- && track.expectedTupleCount == track.receivedTuples)) {
- if (_delegate instanceof FinishedCallback) {
- ((FinishedCallback) _delegate).finishedId(id);
- }
- if (!(_sourceArgs.isEmpty() || type != TupleType.REGULAR)) {
- throw new IllegalStateException(
- "Coordination condition met on a non-coordinating tuple. Should be impossible");
- }
- Iterator<Integer> outTasks = _countOutTasks.iterator();
- while (outTasks.hasNext()) {
- int task = outTasks.next();
- int numTuples = get(track.taskEmittedTuples, task,
- 0);
- _collector.emitDirect(task,
- Constants.COORDINATED_STREAM_ID, tup,
- new Values(id, numTuples));
- }
- for (Tuple t : track.ackTuples) {
- _collector.ack(t);
- }
- track.finished = true;
- _tracked.remove(id);
- }
- if (!delayed && type != TupleType.REGULAR) {
- if (track.failed) {
- _collector.fail(tup);
- } else {
- _collector.ack(tup);
- }
- }
- } else {
- if (type != TupleType.REGULAR)
- _collector.fail(tup);
- }
- } catch (FailedException e) {
- LOG.error("Failed to finish batch", e);
- for (Tuple t : track.ackTuples) {
- _collector.fail(t);
- }
- _tracked.remove(id);
- failed = true;
- }
- }
- return failed;
- }
-
- public void execute(Tuple tuple) {
- Object id = tuple.getValue(0);
- TrackingInfo track;
- TupleType type = getTupleType(tuple);
- synchronized (_tracked) {
- track = _tracked.get(id);
- if (track == null) {
- track = new TrackingInfo();
- if (_idStreamSpec == null)
- track.receivedId = true;
- _tracked.put(id, track);
- }
- }
-
- if (type == TupleType.ID) {
- synchronized (_tracked) {
- track.receivedId = true;
- }
- checkFinishId(tuple, type);
- } else if (type == TupleType.COORD) {
- int count = (Integer) tuple.getValue(1);
- synchronized (_tracked) {
- track.reportCount++;
- track.expectedTupleCount += count;
- }
- checkFinishId(tuple, type);
- } else {
- synchronized (_tracked) {
- _delegate.execute(tuple);
- }
- }
- }
-
- public void cleanup() {
- _delegate.cleanup();
- _tracked.cleanup();
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _delegate.declareOutputFields(declarer);
- declarer.declareStream(Constants.COORDINATED_STREAM_ID, true,
- new Fields("id", "count"));
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return _delegate.getComponentConfiguration();
- }
-
- private static Map<String, SourceArgs> singleSourceArgs(
- String sourceComponent, SourceArgs sourceArgs) {
- Map<String, SourceArgs> ret = new HashMap<String, SourceArgs>();
- ret.put(sourceComponent, sourceArgs);
- return ret;
- }
-
- private class TimeoutItems implements
- TimeCacheMap.ExpiredCallback<Object, TrackingInfo> {
- @Override
- public void expire(Object id, TrackingInfo val) {
- synchronized (_tracked) {
- // the combination of the lock and the finished flag ensure that
- // an id is never timed out if it has been finished
- val.failed = true;
- if (!val.finished) {
- ((TimeoutCallback) _delegate).timeoutId(id);
- }
- }
- }
- }
-
- private TupleType getTupleType(Tuple tuple) {
- if (_idStreamSpec != null
- && tuple.getSourceGlobalStreamid().equals(_idStreamSpec._id)) {
- return TupleType.ID;
- } else if (!_sourceArgs.isEmpty()
- && tuple.getSourceStreamId().equals(
- Constants.COORDINATED_STREAM_ID)) {
- return TupleType.COORD;
- } else {
- return TupleType.REGULAR;
- }
- }
-
- static enum TupleType {
- REGULAR, ID, COORD
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java b/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java
deleted file mode 100644
index f8a9386..0000000
--- a/jstorm-client/src/main/java/backtype/storm/coordination/IBatchBolt.java
+++ /dev/null
@@ -1,16 +0,0 @@
-package backtype.storm.coordination;
-
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IComponent;
-import backtype.storm.tuple.Tuple;
-import java.io.Serializable;
-import java.util.Map;
-
-public interface IBatchBolt<T> extends Serializable, IComponent {
- void prepare(Map conf, TopologyContext context,
- BatchOutputCollector collector, T id);
-
- void execute(Tuple tuple);
-
- void finishBatch();
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java b/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java
deleted file mode 100644
index 1e61512..0000000
--- a/jstorm-client/src/main/java/backtype/storm/daemon/Shutdownable.java
+++ /dev/null
@@ -1,5 +0,0 @@
-package backtype.storm.daemon;
-
-public interface Shutdownable {
- public void shutdown();
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java b/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
deleted file mode 100644
index 756b8aa..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCInvocationsClient.java
+++ /dev/null
@@ -1,78 +0,0 @@
-package backtype.storm.drpc;
-
-import org.apache.thrift7.TException;
-import org.apache.thrift7.protocol.TBinaryProtocol;
-import org.apache.thrift7.transport.TFramedTransport;
-import org.apache.thrift7.transport.TSocket;
-import org.apache.thrift7.transport.TTransport;
-
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-
-public class DRPCInvocationsClient implements DistributedRPCInvocations.Iface {
- private TTransport conn;
- private DistributedRPCInvocations.Client client;
- private String host;
- private int port;
-
- public DRPCInvocationsClient(String host, int port) {
- try {
- this.host = host;
- this.port = port;
- connect();
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- }
-
- private void connect() throws TException {
- conn = new TFramedTransport(new TSocket(host, port));
- client = new DistributedRPCInvocations.Client(new TBinaryProtocol(conn));
- conn.open();
- }
-
- public String getHost() {
- return host;
- }
-
- public int getPort() {
- return port;
- }
-
- public void result(String id, String result) throws TException {
- try {
- if (client == null)
- connect();
- client.result(id, result);
- } catch (TException e) {
- client = null;
- throw e;
- }
- }
-
- public DRPCRequest fetchRequest(String func) throws TException {
- try {
- if (client == null)
- connect();
- return client.fetchRequest(func);
- } catch (TException e) {
- client = null;
- throw e;
- }
- }
-
- public void failRequest(String id) throws TException {
- try {
- if (client == null)
- connect();
- client.failRequest(id);
- } catch (TException e) {
- client = null;
- throw e;
- }
- }
-
- public void close() {
- conn.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java b/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java
deleted file mode 100644
index a68b008..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/DRPCSpout.java
+++ /dev/null
@@ -1,162 +0,0 @@
-package backtype.storm.drpc;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.thrift7.TException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import backtype.storm.Config;
-import backtype.storm.ILocalDRPC;
-import backtype.storm.generated.DRPCRequest;
-import backtype.storm.generated.DistributedRPCInvocations;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
-import backtype.storm.utils.ServiceRegistry;
-import backtype.storm.utils.Utils;
-
-public class DRPCSpout extends BaseRichSpout {
- public static Logger LOG = LoggerFactory.getLogger(DRPCSpout.class);
-
- SpoutOutputCollector _collector;
- List<DRPCInvocationsClient> _clients = new ArrayList<DRPCInvocationsClient>();
- String _function;
- String _local_drpc_id = null;
-
- private static class DRPCMessageId {
- String id;
- int index;
-
- public DRPCMessageId(String id, int index) {
- this.id = id;
- this.index = index;
- }
- }
-
- public DRPCSpout(String function) {
- _function = function;
- }
-
- public DRPCSpout(String function, ILocalDRPC drpc) {
- _function = function;
- _local_drpc_id = drpc.getServiceId();
- }
-
- @Override
- public void open(Map conf, TopologyContext context,
- SpoutOutputCollector collector) {
- _collector = collector;
- if (_local_drpc_id == null) {
- int numTasks = context.getComponentTasks(
- context.getThisComponentId()).size();
- int index = context.getThisTaskIndex();
-
- int port = Utils.getInt(conf.get(Config.DRPC_INVOCATIONS_PORT));
- List<String> servers = (List<String>) conf.get(Config.DRPC_SERVERS);
- if (servers == null || servers.isEmpty()) {
- throw new RuntimeException(
- "No DRPC servers configured for topology");
- }
- if (numTasks < servers.size()) {
- for (String s : servers) {
- _clients.add(new DRPCInvocationsClient(s, port));
- }
- } else {
- int i = index % servers.size();
- _clients.add(new DRPCInvocationsClient(servers.get(i), port));
- }
- }
-
- }
-
- @Override
- public void close() {
- for (DRPCInvocationsClient client : _clients) {
- client.close();
- }
- }
-
- @Override
- public void nextTuple() {
- boolean gotRequest = false;
- if (_local_drpc_id == null) {
- for (int i = 0; i < _clients.size(); i++) {
- DRPCInvocationsClient client = _clients.get(i);
- try {
- DRPCRequest req = client.fetchRequest(_function);
- if (req.get_request_id().length() > 0) {
- Map returnInfo = new HashMap();
- returnInfo.put("id", req.get_request_id());
- returnInfo.put("host", client.getHost());
- returnInfo.put("port", client.getPort());
- gotRequest = true;
- _collector.emit(new Values(req.get_func_args(),
- Utils.to_json(returnInfo)),
- new DRPCMessageId(req.get_request_id(), i));
- break;
- }
- } catch (TException e) {
- LOG.error("Failed to fetch DRPC result from DRPC server", e);
- }
- }
- } else {
- DistributedRPCInvocations.Iface drpc = (DistributedRPCInvocations.Iface) ServiceRegistry
- .getService(_local_drpc_id);
- if (drpc != null) { // can happen during shutdown of drpc while
- // topology is still up
- try {
- DRPCRequest req = drpc.fetchRequest(_function);
- if (req.get_request_id().length() > 0) {
- Map returnInfo = new HashMap();
- returnInfo.put("id", req.get_request_id());
- returnInfo.put("host", _local_drpc_id);
- returnInfo.put("port", 0);
- gotRequest = true;
- _collector.emit(new Values(req.get_func_args(),
- Utils.to_json(returnInfo)),
- new DRPCMessageId(req.get_request_id(), 0));
- }
- } catch (TException e) {
- throw new RuntimeException(e);
- }
- }
- }
- if (!gotRequest) {
- Utils.sleep(1);
- }
- }
-
- @Override
- public void ack(Object msgId) {
- }
-
- @Override
- public void fail(Object msgId) {
- DRPCMessageId did = (DRPCMessageId) msgId;
- DistributedRPCInvocations.Iface client;
-
- if (_local_drpc_id == null) {
- client = _clients.get(did.index);
- } else {
- client = (DistributedRPCInvocations.Iface) ServiceRegistry
- .getService(_local_drpc_id);
- }
- try {
- client.failRequest(did.id);
- } catch (TException e) {
- LOG.error("Failed to fail request", e);
- }
- }
-
- @Override
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("args", "return-info"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java b/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java
deleted file mode 100644
index b6733a3..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/JoinResult.java
+++ /dev/null
@@ -1,59 +0,0 @@
-package backtype.storm.drpc;
-
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichBolt;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.Values;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class JoinResult extends BaseRichBolt {
- public static Logger LOG = LoggerFactory.getLogger(JoinResult.class);
-
- String returnComponent;
- Map<Object, Tuple> returns = new HashMap<Object, Tuple>();
- Map<Object, Tuple> results = new HashMap<Object, Tuple>();
- OutputCollector _collector;
-
- public JoinResult(String returnComponent) {
- this.returnComponent = returnComponent;
- }
-
- public void prepare(Map map, TopologyContext context,
- OutputCollector collector) {
- _collector = collector;
- }
-
- public void execute(Tuple tuple) {
- Object requestId = tuple.getValue(0);
- if (tuple.getSourceComponent().equals(returnComponent)) {
- returns.put(requestId, tuple);
- } else {
- results.put(requestId, tuple);
- }
-
- if (returns.containsKey(requestId) && results.containsKey(requestId)) {
- Tuple result = results.remove(requestId);
- Tuple returner = returns.remove(requestId);
- LOG.debug(result.getValue(1).toString());
- List<Tuple> anchors = new ArrayList<Tuple>();
- anchors.add(result);
- anchors.add(returner);
- _collector.emit(anchors, new Values("" + result.getValue(1),
- returner.getValue(1)));
- _collector.ack(result);
- _collector.ack(returner);
- }
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- declarer.declare(new Fields("result", "return-info"));
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java b/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
deleted file mode 100644
index 287168f..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/KeyedFairBolt.java
+++ /dev/null
@@ -1,75 +0,0 @@
-package backtype.storm.drpc;
-
-import backtype.storm.coordination.CoordinatedBolt.FinishedCallback;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.BasicBoltExecutor;
-import backtype.storm.topology.IBasicBolt;
-import backtype.storm.topology.IRichBolt;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.utils.KeyedRoundRobinQueue;
-import java.util.HashMap;
-import java.util.Map;
-
-public class KeyedFairBolt implements IRichBolt, FinishedCallback {
- IRichBolt _delegate;
- KeyedRoundRobinQueue<Tuple> _rrQueue;
- Thread _executor;
- FinishedCallback _callback;
-
- public KeyedFairBolt(IRichBolt delegate) {
- _delegate = delegate;
- }
-
- public KeyedFairBolt(IBasicBolt delegate) {
- this(new BasicBoltExecutor(delegate));
- }
-
- public void prepare(Map stormConf, TopologyContext context,
- OutputCollector collector) {
- if (_delegate instanceof FinishedCallback) {
- _callback = (FinishedCallback) _delegate;
- }
- _delegate.prepare(stormConf, context, collector);
- _rrQueue = new KeyedRoundRobinQueue<Tuple>();
- _executor = new Thread(new Runnable() {
- public void run() {
- try {
- while (true) {
- _delegate.execute(_rrQueue.take());
- }
- } catch (InterruptedException e) {
-
- }
- }
- });
- _executor.setDaemon(true);
- _executor.start();
- }
-
- public void execute(Tuple input) {
- Object key = input.getValue(0);
- _rrQueue.add(key, input);
- }
-
- public void cleanup() {
- _executor.interrupt();
- _delegate.cleanup();
- }
-
- public void declareOutputFields(OutputFieldsDeclarer declarer) {
- _delegate.declareOutputFields(declarer);
- }
-
- public void finishedId(Object id) {
- if (_callback != null) {
- _callback.finishedId(id);
- }
- }
-
- @Override
- public Map<String, Object> getComponentConfiguration() {
- return new HashMap<String, Object>();
- }
-}
http://git-wip-us.apache.org/repos/asf/storm/blob/e935da91/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
----------------------------------------------------------------------
diff --git a/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java b/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
deleted file mode 100644
index 5277cff..0000000
--- a/jstorm-client/src/main/java/backtype/storm/drpc/LinearDRPCInputDeclarer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-package backtype.storm.drpc;
-
-import backtype.storm.grouping.CustomStreamGrouping;
-import backtype.storm.topology.ComponentConfigurationDeclarer;
-import backtype.storm.tuple.Fields;
-
-public interface LinearDRPCInputDeclarer extends
- ComponentConfigurationDeclarer<LinearDRPCInputDeclarer> {
- public LinearDRPCInputDeclarer fieldsGrouping(Fields fields);
-
- public LinearDRPCInputDeclarer fieldsGrouping(String streamId, Fields fields);
-
- public LinearDRPCInputDeclarer globalGrouping();
-
- public LinearDRPCInputDeclarer globalGrouping(String streamId);
-
- public LinearDRPCInputDeclarer shuffleGrouping();
-
- public LinearDRPCInputDeclarer shuffleGrouping(String streamId);
-
- public LinearDRPCInputDeclarer localOrShuffleGrouping();
-
- public LinearDRPCInputDeclarer localOrShuffleGrouping(String streamId);
-
- public LinearDRPCInputDeclarer noneGrouping();
-
- public LinearDRPCInputDeclarer noneGrouping(String streamId);
-
- public LinearDRPCInputDeclarer allGrouping();
-
- public LinearDRPCInputDeclarer allGrouping(String streamId);
-
- public LinearDRPCInputDeclarer directGrouping();
-
- public LinearDRPCInputDeclarer directGrouping(String streamId);
-
- public LinearDRPCInputDeclarer customGrouping(CustomStreamGrouping grouping);
-
- public LinearDRPCInputDeclarer customGrouping(String streamId,
- CustomStreamGrouping grouping);
-
-}