You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/19 23:52:27 UTC
[02/24] git commit: Moved ShellBolt,
ShellProcess and ShellSpout back to their old packages. Updated
dependencies.
Moved ShellBolt, ShellProcess and ShellSpout back to their old packages. Updated dependencies.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/a7918f8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/a7918f8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/a7918f8e
Branch: refs/heads/master
Commit: a7918f8ef91b82b5b8fd91c91b36378a6c99caad
Parents: 1a19f3d
Author: John Gilmore <jg...@ml.sun.ac.za>
Authored: Mon Oct 7 09:33:53 2013 +0200
Committer: John Gilmore <jg...@ml.sun.ac.za>
Committed: Mon Oct 7 09:33:53 2013 +0200
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/bootstrap.clj | 7 +-
.../backtype/storm/clojure/RichShellBolt.java | 2 +-
.../backtype/storm/clojure/RichShellSpout.java | 2 +-
.../jvm/backtype/storm/multilang/ShellBolt.java | 207 ------------------
.../backtype/storm/multilang/ShellProcess.java | 101 ---------
.../backtype/storm/multilang/ShellSpout.java | 115 ----------
.../jvm/backtype/storm/spout/ShellSpout.java | 117 +++++++++++
.../src/jvm/backtype/storm/task/ShellBolt.java | 208 +++++++++++++++++++
.../jvm/backtype/storm/utils/ShellProcess.java | 106 ++++++++++
9 files changed, 436 insertions(+), 429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/clj/backtype/storm/bootstrap.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj
index 1075c66..9edbe8e 100644
--- a/storm-core/src/clj/backtype/storm/bootstrap.clj
+++ b/storm-core/src/clj/backtype/storm/bootstrap.clj
@@ -12,13 +12,12 @@
RegisteredGlobalState ThriftTopologyUtils DisruptorQueue
MutableObject MutableLong]))
(import (quote [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]))
- (import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector]))
- (import (quote [backtype.storm.multilang ShellSpout ShellBolt]))
+ (import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
(import (quote [backtype.storm.tuple Tuple TupleImpl Fields MessageId]))
(import (quote [backtype.storm.task IBolt IOutputCollector
- OutputCollector TopologyContext
+ OutputCollector TopologyContext ShellBolt
GeneralTopologyContext WorkerTopologyContext]))
- (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs
+ (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs
IBatchBolt BatchBoltExecutor]))
(import (quote [backtype.storm.drpc KeyedFairBolt]))
(import (quote [backtype.storm.daemon Shutdownable]))
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java b/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
index 6ff669e..6be104e 100644
--- a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
@@ -1,7 +1,7 @@
package backtype.storm.clojure;
import backtype.storm.generated.StreamInfo;
-import backtype.storm.multilang.ShellBolt;
+import backtype.storm.task.ShellBolt;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java b/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
index 57410e1..cb5947f 100644
--- a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
@@ -1,7 +1,7 @@
package backtype.storm.clojure;
import backtype.storm.generated.StreamInfo;
-import backtype.storm.multilang.ShellSpout;
+import backtype.storm.spout.ShellSpout;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java b/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java
deleted file mode 100644
index acd60c9..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package backtype.storm.multilang;
-
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.multilang.ShellProcess;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import java.util.Map;
-import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A bolt that shells out to another process to process tuples. ShellBolt
- * communicates with that process over stdio using a special protocol. An ~100
- * line library is required to implement that protocol, and adapter libraries
- * currently exist for Ruby and Python.
- *
- * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
- * in the resources directory within the jar submitted to the master.
- * During development/testing on a local machine, that resources directory just
- * needs to be on the classpath.</p>
- *
- * <p>When creating topologies using the Java API, subclass this bolt and implement
- * the IRichBolt interface to create components for the topology that use other languages. For example:
- * </p>
- *
- * <pre>
- * public class MyBolt extends ShellBolt implements IRichBolt {
- * public MyBolt() {
- * super("python", "mybolt.py");
- * }
- *
- * public void declareOutputFields(OutputFieldsDeclarer declarer) {
- * declarer.declare(new Fields("field1", "field2"));
- * }
- * }
- * </pre>
- */
-public class ShellBolt implements IBolt {
- public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
- Process _subprocess;
- OutputCollector _collector;
- Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
-
- private String[] _command;
- private ShellProcess _process;
- private volatile boolean _running = true;
- private volatile Throwable _exception;
- private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
- private Random _rand;
-
- private Thread _readerThread;
- private Thread _writerThread;
-
- public ShellBolt(ShellComponent component) {
- this(component.get_execution_command(), component.get_script());
- _process = new ShellProcess(new JsonSerializer(), _command);
- }
-
- public ShellBolt(String... command) {
- _command = command;
- _process = new ShellProcess(new JsonSerializer(), _command);
- }
-
- public ShellBolt(ISerializer serializer, String... command) {
- _command = command;
- _process = new ShellProcess(serializer, _command);
- }
-
- public void prepare(Map stormConf, TopologyContext context,
- final OutputCollector collector) {
- _rand = new Random();
- _collector = collector;
-
- //subprocesses must send their pid first thing
- Number subpid = _process.launch(stormConf, context);
- LOG.info("Launched subprocess with pid " + subpid);
-
- // reader
- _readerThread = new Thread(new Runnable() {
- public void run() {
- while (_running) {
- try {
- Emission emission = _process.readEmission();
-
- String command = emission.getCommand();
- if(command.equals("ack")) {
- handleAck(emission.getId());
- } else if (command.equals("fail")) {
- handleFail(emission.getId());
- } else if (command.equals("error")) {
- handleError(emission.getMsg());
- } else if (command.equals("log")) {
- String msg = emission.getMsg();
- LOG.info("Shell msg: " + msg);
- } else if (command.equals("emit")) {
- handleEmit(emission);
- }
- } catch (InterruptedException e) {
- } catch (Throwable t) {
- die(t);
- }
- }
- }
- });
-
- _readerThread.start();
-
- _writerThread = new Thread(new Runnable() {
- public void run() {
- while (_running) {
- try {
- // FIXME: This can either be TaskIds or Immissions
- Object write = _pendingWrites.poll(1, SECONDS);
- if (write instanceof Immission) {
- _process.writeImmission((Immission)write);
- } else if (write instanceof List<?>) {
-
- }
- } catch (InterruptedException e) {
- } catch (Throwable t) {
- die(t);
- }
- }
- }
- });
-
- _writerThread.start();
- }
-
- public void execute(Tuple input) {
- if (_exception != null) {
- throw new RuntimeException(_exception);
- }
-
- //just need an id
- String genId = Long.toString(_rand.nextLong());
- _inputs.put(genId, input);
- try {
- Immission immission = new Immission();
- immission.setId(genId);
- immission.setComp(input.getSourceComponent());
- immission.setStream(input.getSourceStreamId());
- immission.setTask(input.getSourceTask());
- immission.setTuple(input.getValues());
-
- _pendingWrites.put(immission);
- } catch(InterruptedException e) {
- throw new RuntimeException("Error during multilang processing", e);
- }
- }
-
- public void cleanup() {
- _running = false;
- _process.destroy();
- _inputs.clear();
- }
-
- private void handleAck(String id) {
- Tuple acked = _inputs.remove(id);
- if(acked==null) {
- throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
- }
- _collector.ack(acked);
- }
-
- private void handleFail(String id) {
- Tuple failed = _inputs.remove(id);
- if(failed==null) {
- throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
- }
- _collector.fail(failed);
- }
-
- private void handleError(String msg) {
- _collector.reportError(new Exception("Shell Process Exception: " + msg));
- }
-
- private void handleEmit(Emission emission) throws InterruptedException {
- List<Tuple> anchors = new ArrayList<Tuple>();
- for (String anchor : emission.getAnchors()) {
- Tuple t = _inputs.get(anchor);
- if (t == null) {
- throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
- }
- anchors.add(t);
- }
-
- if(emission.getTask() == 0) {
- List<Integer> outtasks = _collector.emit(emission.getStream(), anchors, emission.getTuple());
- _pendingWrites.put(outtasks);
- } else {
- _collector.emitDirect((int)emission.getTask(), emission.getStream(), anchors, emission.getTuple());
- }
- }
-
- private void die(Throwable exception) {
- _exception = exception;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java b/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java
deleted file mode 100644
index 12689d8..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package backtype.storm.multilang;
-
-import backtype.storm.task.TopologyContext;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Logger;
-
-public class ShellProcess implements Serializable {
- public static Logger LOG = Logger.getLogger(ShellProcess.class);
- private Process _subprocess;
- private InputStream processErrorStream;
- private String[] command;
- public ISerializer serializer;
-
- public ShellProcess(ISerializer serializer, String[] command) {
- this.command = command;
- this.serializer = serializer;
- }
-
- public Number launch(Map conf, TopologyContext context) {
- ProcessBuilder builder = new ProcessBuilder(command);
- builder.directory(new File(context.getCodeDir()));
-
- Number pid;
- try {
- _subprocess = builder.start();
- processErrorStream = _subprocess.getErrorStream();
- serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
- pid = serializer.connect(conf, context);
- } catch (IOException e) {
- throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e);
- } catch (NoOutputException e) {
- throw new RuntimeException(e + getErrorsString() + "\n");
- }
- return pid;
- }
-
- public void destroy() {
- _subprocess.destroy();
- }
-
- public Emission readEmission() throws IOException {
- try {
- return serializer.readEmission();
- } catch (NoOutputException e) {
- throw new RuntimeException(e + getErrorsString() + "\n");
- }
- }
-
- public void writeImmission(Immission msg) throws IOException {
- serializer.writeImmission(msg);
- // drain the error stream to avoid dead lock because of full error stream buffer
- drainErrorStream();
- }
-
- public void writeSpoutMsg(SpoutMsg msg) throws IOException {
- serializer.writeSpoutMsg(msg);
- // drain the error stream to avoid dead lock because of full error stream buffer
- drainErrorStream();
- }
-
- public void writeTaskIds(List<Integer> taskIds) throws IOException {
- serializer.writeTaskIds(taskIds);
- // drain the error stream to avoid dead lock because of full error stream buffer
- drainErrorStream();
- }
-
- public void drainErrorStream()
- {
- try {
- while (processErrorStream.available() > 0)
- {
- int bufferSize = processErrorStream.available();
- byte[] errorReadingBuffer = new byte[bufferSize];
-
- processErrorStream.read(errorReadingBuffer, 0, bufferSize);
-
- LOG.info("Got error from shell process: " + new String(errorReadingBuffer));
- }
- } catch(Exception e) {
- }
- }
-
- public String getErrorsString() {
- if(processErrorStream!=null) {
- try {
- return IOUtils.toString(processErrorStream);
- } catch(IOException e) {
- return "(Unable to capture error stream)";
- }
- } else {
- return "";
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java b/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java
deleted file mode 100644
index 7895a00..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package backtype.storm.multilang;
-
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.spout.ISpout;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.multilang.ShellProcess;
-import backtype.storm.utils.Utils;
-import java.util.Map;
-import java.util.List;
-import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ShellSpout implements ISpout {
- public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
-
- private SpoutOutputCollector _collector;
- private String[] _command;
- private ShellProcess _process;
- private SpoutMsg spoutMsg;
-
- public ShellSpout(ShellComponent component) {
- this(component.get_execution_command(), component.get_script());
- _process = new ShellProcess(new JsonSerializer(), _command);
- }
-
- public ShellSpout(String... command) {
- _command = command;
- _process = new ShellProcess(new JsonSerializer(), _command);
- }
-
- public ShellSpout(ISerializer serializer, String... command) {
- _command = command;
- _process = new ShellProcess(serializer, _command);
- }
-
- public void open(Map stormConf, TopologyContext context,
- SpoutOutputCollector collector) {
- _collector = collector;
-
- Number subpid = _process.launch(stormConf, context);
- LOG.info("Launched subprocess with pid " + subpid);
- }
-
- public void close() {
- _process.destroy();
- }
-
- public void nextTuple() {
- if (spoutMsg == null) {
- spoutMsg = new SpoutMsg();
- }
- spoutMsg.setCommand("next");
- spoutMsg.setId("");
- querySubprocess();
- }
-
- public void ack(Object msgId) {
- if (spoutMsg == null) {
- spoutMsg = new SpoutMsg();
- }
- spoutMsg.setCommand("ack");
- spoutMsg.setId(msgId.toString());
- querySubprocess();
- }
-
- public void fail(Object msgId) {
- if (spoutMsg == null) {
- spoutMsg = new SpoutMsg();
- }
- spoutMsg.setCommand("fail");
- spoutMsg.setId(msgId.toString());
- querySubprocess();
- }
-
- private void querySubprocess() {
- try {
- _process.writeSpoutMsg(spoutMsg);
-
- while (true) {
- Emission emission = _process.readEmission();
- String command = emission.getCommand();
- if (command.equals("sync")) {
- return;
- } else if (command.equals("log")) {
- String msg = emission.getMsg();
- LOG.info("Shell msg: " + msg);
- } else if (command.equals("emit")) {
- String stream = emission.getStream();
- Long task = emission.getTask();
- List<Object> tuple = emission.getTuple();
- Object messageId = emission.getId();
- if (task == 0) {
- List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
- _process.writeTaskIds(outtasks);
- } else {
- _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
- }
- }
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void activate() {
- }
-
- @Override
- public void deactivate() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
new file mode 100644
index 0000000..c9ef682
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -0,0 +1,117 @@
+package backtype.storm.spout;
+
+import backtype.storm.generated.ShellComponent;
+import backtype.storm.multilang.Emission;
+import backtype.storm.multilang.ISerializer;
+import backtype.storm.multilang.JsonSerializer;
+import backtype.storm.multilang.SpoutMsg;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.utils.ShellProcess;
+import backtype.storm.utils.Utils;
+import java.util.Map;
+import java.util.List;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ShellSpout implements ISpout {
+ public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
+
+ private SpoutOutputCollector _collector;
+ private String[] _command;
+ private ShellProcess _process;
+ private SpoutMsg spoutMsg;
+
+ public ShellSpout(ShellComponent component) {
+ this(component.get_execution_command(), component.get_script());
+ _process = new ShellProcess(new JsonSerializer(), _command);
+ }
+
+ public ShellSpout(String... command) {
+ _command = command;
+ _process = new ShellProcess(new JsonSerializer(), _command);
+ }
+
+ public ShellSpout(ISerializer serializer, String... command) {
+ _command = command;
+ _process = new ShellProcess(serializer, _command);
+ }
+
+ public void open(Map stormConf, TopologyContext context,
+ SpoutOutputCollector collector) {
+ _collector = collector;
+
+ Number subpid = _process.launch(stormConf, context);
+ LOG.info("Launched subprocess with pid " + subpid);
+ }
+
+ public void close() {
+ _process.destroy();
+ }
+
+ public void nextTuple() {
+ if (spoutMsg == null) {
+ spoutMsg = new SpoutMsg();
+ }
+ spoutMsg.setCommand("next");
+ spoutMsg.setId("");
+ querySubprocess();
+ }
+
+ public void ack(Object msgId) {
+ if (spoutMsg == null) {
+ spoutMsg = new SpoutMsg();
+ }
+ spoutMsg.setCommand("ack");
+ spoutMsg.setId(msgId.toString());
+ querySubprocess();
+ }
+
+ public void fail(Object msgId) {
+ if (spoutMsg == null) {
+ spoutMsg = new SpoutMsg();
+ }
+ spoutMsg.setCommand("fail");
+ spoutMsg.setId(msgId.toString());
+ querySubprocess();
+ }
+
+ private void querySubprocess() {
+ try {
+ _process.writeSpoutMsg(spoutMsg);
+
+ while (true) {
+ Emission emission = _process.readEmission();
+ String command = emission.getCommand();
+ if (command.equals("sync")) {
+ return;
+ } else if (command.equals("log")) {
+ String msg = emission.getMsg();
+ LOG.info("Shell msg: " + msg);
+ } else if (command.equals("emit")) {
+ String stream = emission.getStream();
+ Long task = emission.getTask();
+ List<Object> tuple = emission.getTuple();
+ Object messageId = emission.getId();
+ if (task == 0) {
+ List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
+ _process.writeTaskIds(outtasks);
+ } else {
+ _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
+ }
+ }
+ }
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void activate() {
+ }
+
+ @Override
+ public void deactivate() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
new file mode 100644
index 0000000..36585e3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -0,0 +1,208 @@
+package backtype.storm.task;
+
+import backtype.storm.generated.ShellComponent;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.ShellProcess;
+import backtype.storm.multilang.Emission;
+import backtype.storm.multilang.ISerializer;
+import backtype.storm.multilang.Immission;
+import backtype.storm.multilang.JsonSerializer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import java.util.Map;
+import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bolt that shells out to another process to process tuples. ShellBolt
+ * communicates with that process over stdio using a special protocol. An ~100
+ * line library is required to implement that protocol, and adapter libraries
+ * currently exist for Ruby and Python.
+ *
+ * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
+ * in the resources directory within the jar submitted to the master.
+ * During development/testing on a local machine, that resources directory just
+ * needs to be on the classpath.</p>
+ *
+ * <p>When creating topologies using the Java API, subclass this bolt and implement
+ * the IRichBolt interface to create components for the topology that use other languages. For example:
+ * </p>
+ *
+ * <pre>
+ * public class MyBolt extends ShellBolt implements IRichBolt {
+ * public MyBolt() {
+ * super("python", "mybolt.py");
+ * }
+ *
+ * public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ * declarer.declare(new Fields("field1", "field2"));
+ * }
+ * }
+ * </pre>
+ */
+public class ShellBolt implements IBolt {
+ public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
+ Process _subprocess;
+ OutputCollector _collector;
+ Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
+
+ private String[] _command;
+ private ShellProcess _process;
+ private volatile boolean _running = true;
+ private volatile Throwable _exception;
+ private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
+ private Random _rand;
+
+ private Thread _readerThread;
+ private Thread _writerThread;
+
+ public ShellBolt(ShellComponent component) {
+ this(component.get_execution_command(), component.get_script());
+ _process = new ShellProcess(new JsonSerializer(), _command);
+ }
+
+ public ShellBolt(String... command) {
+ _command = command;
+ _process = new ShellProcess(new JsonSerializer(), _command);
+ }
+
+ public ShellBolt(ISerializer serializer, String... command) {
+ _command = command;
+ _process = new ShellProcess(serializer, _command);
+ }
+
+ public void prepare(Map stormConf, TopologyContext context,
+ final OutputCollector collector) {
+ _rand = new Random();
+ _collector = collector;
+
+ //subprocesses must send their pid first thing
+ Number subpid = _process.launch(stormConf, context);
+ LOG.info("Launched subprocess with pid " + subpid);
+
+ // reader
+ _readerThread = new Thread(new Runnable() {
+ public void run() {
+ while (_running) {
+ try {
+ Emission emission = _process.readEmission();
+
+ String command = emission.getCommand();
+ if(command.equals("ack")) {
+ handleAck(emission.getId());
+ } else if (command.equals("fail")) {
+ handleFail(emission.getId());
+ } else if (command.equals("error")) {
+ handleError(emission.getMsg());
+ } else if (command.equals("log")) {
+ String msg = emission.getMsg();
+ LOG.info("Shell msg: " + msg);
+ } else if (command.equals("emit")) {
+ handleEmit(emission);
+ }
+ } catch (InterruptedException e) {
+ } catch (Throwable t) {
+ die(t);
+ }
+ }
+ }
+ });
+
+ _readerThread.start();
+
+ _writerThread = new Thread(new Runnable() {
+ public void run() {
+ while (_running) {
+ try {
+ // FIXME: This can either be TaskIds or Immissions
+ Object write = _pendingWrites.poll(1, SECONDS);
+ if (write instanceof Immission) {
+ _process.writeImmission((Immission)write);
+ } else if (write instanceof List<?>) {
+
+ }
+ } catch (InterruptedException e) {
+ } catch (Throwable t) {
+ die(t);
+ }
+ }
+ }
+ });
+
+ _writerThread.start();
+ }
+
+ public void execute(Tuple input) {
+ if (_exception != null) {
+ throw new RuntimeException(_exception);
+ }
+
+ //just need an id
+ String genId = Long.toString(_rand.nextLong());
+ _inputs.put(genId, input);
+ try {
+ Immission immission = new Immission();
+ immission.setId(genId);
+ immission.setComp(input.getSourceComponent());
+ immission.setStream(input.getSourceStreamId());
+ immission.setTask(input.getSourceTask());
+ immission.setTuple(input.getValues());
+
+ _pendingWrites.put(immission);
+ } catch(InterruptedException e) {
+ throw new RuntimeException("Error during multilang processing", e);
+ }
+ }
+
+ public void cleanup() {
+ _running = false;
+ _process.destroy();
+ _inputs.clear();
+ }
+
+ private void handleAck(String id) {
+ Tuple acked = _inputs.remove(id);
+ if(acked==null) {
+ throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
+ }
+ _collector.ack(acked);
+ }
+
+ private void handleFail(String id) {
+ Tuple failed = _inputs.remove(id);
+ if(failed==null) {
+ throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
+ }
+ _collector.fail(failed);
+ }
+
+ private void handleError(String msg) {
+ _collector.reportError(new Exception("Shell Process Exception: " + msg));
+ }
+
+ private void handleEmit(Emission emission) throws InterruptedException {
+ List<Tuple> anchors = new ArrayList<Tuple>();
+ for (String anchor : emission.getAnchors()) {
+ Tuple t = _inputs.get(anchor);
+ if (t == null) {
+ throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
+ }
+ anchors.add(t);
+ }
+
+ if(emission.getTask() == 0) {
+ List<Integer> outtasks = _collector.emit(emission.getStream(), anchors, emission.getTuple());
+ _pendingWrites.put(outtasks);
+ } else {
+ _collector.emitDirect((int)emission.getTask(), emission.getStream(), anchors, emission.getTuple());
+ }
+ }
+
+ private void die(Throwable exception) {
+ _exception = exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
new file mode 100644
index 0000000..38976bc
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -0,0 +1,106 @@
+package backtype.storm.utils;
+
+import backtype.storm.multilang.Emission;
+import backtype.storm.multilang.ISerializer;
+import backtype.storm.multilang.Immission;
+import backtype.storm.multilang.NoOutputException;
+import backtype.storm.multilang.SpoutMsg;
+import backtype.storm.task.TopologyContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+public class ShellProcess implements Serializable {
+ public static Logger LOG = Logger.getLogger(ShellProcess.class);
+ private Process _subprocess;
+ private InputStream processErrorStream;
+ private String[] command;
+ public ISerializer serializer;
+
+ public ShellProcess(ISerializer serializer, String[] command) {
+ this.command = command;
+ this.serializer = serializer;
+ }
+
+ public Number launch(Map conf, TopologyContext context) {
+ ProcessBuilder builder = new ProcessBuilder(command);
+ builder.directory(new File(context.getCodeDir()));
+
+ Number pid;
+ try {
+ _subprocess = builder.start();
+ processErrorStream = _subprocess.getErrorStream();
+ serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
+ pid = serializer.connect(conf, context);
+ } catch (IOException e) {
+ throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e);
+ } catch (NoOutputException e) {
+ throw new RuntimeException(e + getErrorsString() + "\n");
+ }
+ return pid;
+ }
+
+ public void destroy() {
+ _subprocess.destroy();
+ }
+
+ public Emission readEmission() throws IOException {
+ try {
+ return serializer.readEmission();
+ } catch (NoOutputException e) {
+ throw new RuntimeException(e + getErrorsString() + "\n");
+ }
+ }
+
+ public void writeImmission(Immission msg) throws IOException {
+ serializer.writeImmission(msg);
+ // drain the error stream to avoid dead lock because of full error stream buffer
+ drainErrorStream();
+ }
+
+ public void writeSpoutMsg(SpoutMsg msg) throws IOException {
+ serializer.writeSpoutMsg(msg);
+ // drain the error stream to avoid dead lock because of full error stream buffer
+ drainErrorStream();
+ }
+
+ public void writeTaskIds(List<Integer> taskIds) throws IOException {
+ serializer.writeTaskIds(taskIds);
+ // drain the error stream to avoid dead lock because of full error stream buffer
+ drainErrorStream();
+ }
+
+ public void drainErrorStream()
+ {
+ try {
+ while (processErrorStream.available() > 0)
+ {
+ int bufferSize = processErrorStream.available();
+ byte[] errorReadingBuffer = new byte[bufferSize];
+
+ processErrorStream.read(errorReadingBuffer, 0, bufferSize);
+
+ LOG.info("Got error from shell process: " + new String(errorReadingBuffer));
+ }
+ } catch(Exception e) {
+ }
+ }
+
+ public String getErrorsString() {
+ if(processErrorStream!=null) {
+ try {
+ return IOUtils.toString(processErrorStream);
+ } catch(IOException e) {
+ return "(Unable to capture error stream)";
+ }
+ } else {
+ return "";
+ }
+ }
+}