You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/19 23:52:27 UTC

[02/24] git commit: Moved ShellBolt, ShellProcess and ShellSpout back to their old packages. Updated dependencies.

Moved ShellBolt, ShellProcess and ShellSpout back to their old packages. Updated dependencies.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/a7918f8e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/a7918f8e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/a7918f8e

Branch: refs/heads/master
Commit: a7918f8ef91b82b5b8fd91c91b36378a6c99caad
Parents: 1a19f3d
Author: John Gilmore <jg...@ml.sun.ac.za>
Authored: Mon Oct 7 09:33:53 2013 +0200
Committer: John Gilmore <jg...@ml.sun.ac.za>
Committed: Mon Oct 7 09:33:53 2013 +0200

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/bootstrap.clj |   7 +-
 .../backtype/storm/clojure/RichShellBolt.java   |   2 +-
 .../backtype/storm/clojure/RichShellSpout.java  |   2 +-
 .../jvm/backtype/storm/multilang/ShellBolt.java | 207 ------------------
 .../backtype/storm/multilang/ShellProcess.java  | 101 ---------
 .../backtype/storm/multilang/ShellSpout.java    | 115 ----------
 .../jvm/backtype/storm/spout/ShellSpout.java    | 117 +++++++++++
 .../src/jvm/backtype/storm/task/ShellBolt.java  | 208 +++++++++++++++++++
 .../jvm/backtype/storm/utils/ShellProcess.java  | 106 ++++++++++
 9 files changed, 436 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/clj/backtype/storm/bootstrap.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj
index 1075c66..9edbe8e 100644
--- a/storm-core/src/clj/backtype/storm/bootstrap.clj
+++ b/storm-core/src/clj/backtype/storm/bootstrap.clj
@@ -12,13 +12,12 @@
                      RegisteredGlobalState ThriftTopologyUtils DisruptorQueue
                      MutableObject MutableLong]))
      (import (quote [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]))
-     (import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector]))
-     (import (quote [backtype.storm.multilang ShellSpout ShellBolt]))
+     (import (quote [backtype.storm.spout ISpout SpoutOutputCollector ISpoutOutputCollector ShellSpout]))
      (import (quote [backtype.storm.tuple Tuple TupleImpl Fields MessageId]))
      (import (quote [backtype.storm.task IBolt IOutputCollector
-                     OutputCollector TopologyContext
+                     OutputCollector TopologyContext ShellBolt
                      GeneralTopologyContext WorkerTopologyContext]))
-     (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs 
+     (import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs
                      IBatchBolt BatchBoltExecutor]))
      (import (quote [backtype.storm.drpc KeyedFairBolt]))
      (import (quote [backtype.storm.daemon Shutdownable]))

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java b/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
index 6ff669e..6be104e 100644
--- a/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/clojure/RichShellBolt.java
@@ -1,7 +1,7 @@
 package backtype.storm.clojure;
 
 import backtype.storm.generated.StreamInfo;
-import backtype.storm.multilang.ShellBolt;
+import backtype.storm.task.ShellBolt;
 import backtype.storm.topology.IRichBolt;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java b/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
index 57410e1..cb5947f 100644
--- a/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/clojure/RichShellSpout.java
@@ -1,7 +1,7 @@
 package backtype.storm.clojure;
 
 import backtype.storm.generated.StreamInfo;
-import backtype.storm.multilang.ShellSpout;
+import backtype.storm.spout.ShellSpout;
 import backtype.storm.topology.IRichSpout;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.tuple.Fields;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java b/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java
deleted file mode 100644
index acd60c9..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellBolt.java
+++ /dev/null
@@ -1,207 +0,0 @@
-package backtype.storm.multilang;
-
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.task.IBolt;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.tuple.Tuple;
-import backtype.storm.multilang.ShellProcess;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import static java.util.concurrent.TimeUnit.SECONDS;
-import java.util.Map;
-import java.util.Random;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * A bolt that shells out to another process to process tuples. ShellBolt
- * communicates with that process over stdio using a special protocol. An ~100
- * line library is required to implement that protocol, and adapter libraries
- * currently exist for Ruby and Python.
- *
- * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
- * in the resources directory within the jar submitted to the master.
- * During development/testing on a local machine, that resources directory just
- * needs to be on the classpath.</p>
- *
- * <p>When creating topologies using the Java API, subclass this bolt and implement
- * the IRichBolt interface to create components for the topology that use other languages. For example:
- * </p>
- *
- * <pre>
- * public class MyBolt extends ShellBolt implements IRichBolt {
- *      public MyBolt() {
- *          super("python", "mybolt.py");
- *      }
- *
- *      public void declareOutputFields(OutputFieldsDeclarer declarer) {
- *          declarer.declare(new Fields("field1", "field2"));
- *      }
- * }
- * </pre>
- */
-public class ShellBolt implements IBolt {
-    public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
-    Process _subprocess;
-    OutputCollector _collector;
-    Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
-
-    private String[] _command;
-    private ShellProcess _process;
-    private volatile boolean _running = true;
-    private volatile Throwable _exception;
-    private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
-    private Random _rand;
-
-    private Thread _readerThread;
-    private Thread _writerThread;
-
-    public ShellBolt(ShellComponent component) {
-        this(component.get_execution_command(), component.get_script());
-        _process = new ShellProcess(new JsonSerializer(), _command);
-    }
-
-    public ShellBolt(String... command) {
-        _command = command;
-        _process = new ShellProcess(new JsonSerializer(), _command);
-    }
-
-    public ShellBolt(ISerializer serializer, String... command) {
-        _command = command;
-        _process = new ShellProcess(serializer, _command);
-    }
-
-    public void prepare(Map stormConf, TopologyContext context,
-                        final OutputCollector collector) {
-        _rand = new Random();
-        _collector = collector;
-
-        //subprocesses must send their pid first thing
-        Number subpid = _process.launch(stormConf, context);
-        LOG.info("Launched subprocess with pid " + subpid);
-
-        // reader
-        _readerThread = new Thread(new Runnable() {
-            public void run() {
-                while (_running) {
-                    try {
-                        Emission emission = _process.readEmission();
-
-                        String command = emission.getCommand();
-                        if(command.equals("ack")) {
-                            handleAck(emission.getId());
-                        } else if (command.equals("fail")) {
-                            handleFail(emission.getId());
-                        } else if (command.equals("error")) {
-                            handleError(emission.getMsg());
-                        } else if (command.equals("log")) {
-                            String msg = emission.getMsg();
-                            LOG.info("Shell msg: " + msg);
-                        } else if (command.equals("emit")) {
-                            handleEmit(emission);
-                        }
-                    } catch (InterruptedException e) {
-                    } catch (Throwable t) {
-                        die(t);
-                    }
-                }
-            }
-        });
-
-        _readerThread.start();
-
-        _writerThread = new Thread(new Runnable() {
-            public void run() {
-                while (_running) {
-                    try {
-                    	// FIXME: This can either be TaskIds or Immissions
-                        Object write = _pendingWrites.poll(1, SECONDS);
-                        if (write instanceof Immission) {
-                            _process.writeImmission((Immission)write);
-                        } else if (write instanceof List<?>) {
-
-                        }
-                    } catch (InterruptedException e) {
-                    } catch (Throwable t) {
-                        die(t);
-                    }
-                }
-            }
-        });
-
-        _writerThread.start();
-    }
-
-    public void execute(Tuple input) {
-        if (_exception != null) {
-            throw new RuntimeException(_exception);
-        }
-
-        //just need an id
-        String genId = Long.toString(_rand.nextLong());
-        _inputs.put(genId, input);
-        try {
-            Immission immission = new Immission();
-            immission.setId(genId);
-            immission.setComp(input.getSourceComponent());
-            immission.setStream(input.getSourceStreamId());
-            immission.setTask(input.getSourceTask());
-            immission.setTuple(input.getValues());
-
-            _pendingWrites.put(immission);
-        } catch(InterruptedException e) {
-            throw new RuntimeException("Error during multilang processing", e);
-        }
-    }
-
-    public void cleanup() {
-        _running = false;
-        _process.destroy();
-        _inputs.clear();
-    }
-
-    private void handleAck(String id) {
-        Tuple acked = _inputs.remove(id);
-        if(acked==null) {
-            throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
-        }
-        _collector.ack(acked);
-    }
-
-    private void handleFail(String id) {
-        Tuple failed = _inputs.remove(id);
-        if(failed==null) {
-            throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
-        }
-        _collector.fail(failed);
-    }
-
-    private void handleError(String msg) {
-        _collector.reportError(new Exception("Shell Process Exception: " + msg));
-    }
-
-    private void handleEmit(Emission emission) throws InterruptedException {
-    	List<Tuple> anchors = new ArrayList<Tuple>();
-    	for (String anchor : emission.getAnchors()) {
-	    	Tuple t = _inputs.get(anchor);
-	        if (t == null) {
-	            throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
-	        }
-	        anchors.add(t);
-    	}
-
-        if(emission.getTask() == 0) {
-            List<Integer> outtasks = _collector.emit(emission.getStream(), anchors, emission.getTuple());
-            _pendingWrites.put(outtasks);
-        } else {
-            _collector.emitDirect((int)emission.getTask(), emission.getStream(), anchors, emission.getTuple());
-        }
-    }
-
-    private void die(Throwable exception) {
-        _exception = exception;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java b/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java
deleted file mode 100644
index 12689d8..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellProcess.java
+++ /dev/null
@@ -1,101 +0,0 @@
-package backtype.storm.multilang;
-
-import backtype.storm.task.TopologyContext;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.Serializable;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.log4j.Logger;
-
-public class ShellProcess implements Serializable {
-    public static Logger LOG = Logger.getLogger(ShellProcess.class);
-    private Process _subprocess;
-    private InputStream processErrorStream;
-    private String[] command;
-    public ISerializer serializer;
-
-    public ShellProcess(ISerializer serializer, String[] command) {
-        this.command = command;
-        this.serializer = serializer;
-    }
-
-    public Number launch(Map conf, TopologyContext context) {
-        ProcessBuilder builder = new ProcessBuilder(command);
-        builder.directory(new File(context.getCodeDir()));
-
-        Number pid;
-        try {
-	        _subprocess = builder.start();
-	        processErrorStream = _subprocess.getErrorStream();
-	        serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
-	        pid = serializer.connect(conf, context);
-        } catch (IOException e) {
-        	throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e);
-        } catch (NoOutputException e) {
-            throw new RuntimeException(e + getErrorsString() + "\n");
-    	}
-        return pid;
-    }
-
-    public void destroy() {
-        _subprocess.destroy();
-    }
-
-    public Emission readEmission() throws IOException {
-    	try  {
-    		return serializer.readEmission();
-    	} catch (NoOutputException e) {
-            throw new RuntimeException(e + getErrorsString() + "\n");
-    	}
-    }
-
-    public void writeImmission(Immission msg) throws IOException {
-    	serializer.writeImmission(msg);
-        // drain the error stream to avoid dead lock because of full error stream buffer
-        drainErrorStream();
-    }
-
-    public void writeSpoutMsg(SpoutMsg msg) throws IOException {
-    	serializer.writeSpoutMsg(msg);
-        // drain the error stream to avoid dead lock because of full error stream buffer
-        drainErrorStream();
-    }
-
-    public void writeTaskIds(List<Integer> taskIds) throws IOException {
-    	serializer.writeTaskIds(taskIds);
-        // drain the error stream to avoid dead lock because of full error stream buffer
-        drainErrorStream();
-    }
-
-    public void drainErrorStream()
-    {
-        try {
-            while (processErrorStream.available() > 0)
-            {
-                int bufferSize = processErrorStream.available();
-                byte[] errorReadingBuffer =  new byte[bufferSize];
-
-                processErrorStream.read(errorReadingBuffer, 0, bufferSize);
-
-                LOG.info("Got error from shell process: " + new String(errorReadingBuffer));
-            }
-        } catch(Exception e) {
-        }
-    }
-
-    public String getErrorsString() {
-        if(processErrorStream!=null) {
-            try {
-                return IOUtils.toString(processErrorStream);
-            } catch(IOException e) {
-                return "(Unable to capture error stream)";
-            }
-        } else {
-            return "";
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java b/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java
deleted file mode 100644
index 7895a00..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/ShellSpout.java
+++ /dev/null
@@ -1,115 +0,0 @@
-package backtype.storm.multilang;
-
-import backtype.storm.generated.ShellComponent;
-import backtype.storm.spout.ISpout;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.multilang.ShellProcess;
-import backtype.storm.utils.Utils;
-import java.util.Map;
-import java.util.List;
-import java.io.IOException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class ShellSpout implements ISpout {
-    public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
-
-    private SpoutOutputCollector _collector;
-    private String[] _command;
-    private ShellProcess _process;
-    private SpoutMsg spoutMsg;
-
-    public ShellSpout(ShellComponent component) {
-        this(component.get_execution_command(), component.get_script());
-        _process = new ShellProcess(new JsonSerializer(), _command);
-    }
-
-    public ShellSpout(String... command) {
-        _command = command;
-        _process = new ShellProcess(new JsonSerializer(), _command);
-    }
-
-    public ShellSpout(ISerializer serializer, String... command) {
-        _command = command;
-        _process = new ShellProcess(serializer, _command);
-    }
-
-    public void open(Map stormConf, TopologyContext context,
-                     SpoutOutputCollector collector) {
-        _collector = collector;
-
-        Number subpid = _process.launch(stormConf, context);
-        LOG.info("Launched subprocess with pid " + subpid);
-    }
-
-    public void close() {
-        _process.destroy();
-    }
-
-    public void nextTuple() {
-        if (spoutMsg == null) {
-            spoutMsg = new SpoutMsg();
-        }
-        spoutMsg.setCommand("next");
-        spoutMsg.setId("");
-        querySubprocess();
-    }
-
-    public void ack(Object msgId) {
-    	if (spoutMsg == null) {
-            spoutMsg = new SpoutMsg();
-        }
-        spoutMsg.setCommand("ack");
-        spoutMsg.setId(msgId.toString());
-        querySubprocess();
-    }
-
-    public void fail(Object msgId) {
-    	if (spoutMsg == null) {
-            spoutMsg = new SpoutMsg();
-        }
-        spoutMsg.setCommand("fail");
-        spoutMsg.setId(msgId.toString());
-        querySubprocess();
-    }
-
-    private void querySubprocess() {
-        try {
-            _process.writeSpoutMsg(spoutMsg);
-
-            while (true) {
-                Emission emission = _process.readEmission();
-                String command = emission.getCommand();
-                if (command.equals("sync")) {
-                    return;
-                } else if (command.equals("log")) {
-                    String msg = emission.getMsg();
-                    LOG.info("Shell msg: " + msg);
-                } else if (command.equals("emit")) {
-                    String stream = emission.getStream();
-                    Long task = emission.getTask();
-                    List<Object> tuple = emission.getTuple();
-                    Object messageId = emission.getId();
-                    if (task == 0) {
-                        List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
-                        _process.writeTaskIds(outtasks);
-                    } else {
-                        _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
-                    }
-                }
-            }
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    @Override
-    public void activate() {
-    }
-
-    @Override
-    public void deactivate() {
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
new file mode 100644
index 0000000..c9ef682
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -0,0 +1,117 @@
+package backtype.storm.spout;
+
+import backtype.storm.generated.ShellComponent;
+import backtype.storm.multilang.Emission;
+import backtype.storm.multilang.ISerializer;
+import backtype.storm.multilang.JsonSerializer;
+import backtype.storm.multilang.SpoutMsg;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.utils.ShellProcess;
+import backtype.storm.utils.Utils;
+import java.util.Map;
+import java.util.List;
+import java.io.IOException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class ShellSpout implements ISpout {
+    public static Logger LOG = LoggerFactory.getLogger(ShellSpout.class);
+
+    private SpoutOutputCollector _collector;
+    private String[] _command;
+    private ShellProcess _process;
+    private SpoutMsg spoutMsg;
+
+    public ShellSpout(ShellComponent component) {
+        this(component.get_execution_command(), component.get_script());
+        _process = new ShellProcess(new JsonSerializer(), _command);
+    }
+
+    public ShellSpout(String... command) {
+        _command = command;
+        _process = new ShellProcess(new JsonSerializer(), _command);
+    }
+
+    public ShellSpout(ISerializer serializer, String... command) {
+        _command = command;
+        _process = new ShellProcess(serializer, _command);
+    }
+
+    public void open(Map stormConf, TopologyContext context,
+                     SpoutOutputCollector collector) {
+        _collector = collector;
+
+        Number subpid = _process.launch(stormConf, context);
+        LOG.info("Launched subprocess with pid " + subpid);
+    }
+
+    public void close() {
+        _process.destroy();
+    }
+
+    public void nextTuple() {
+        if (spoutMsg == null) {
+            spoutMsg = new SpoutMsg();
+        }
+        spoutMsg.setCommand("next");
+        spoutMsg.setId("");
+        querySubprocess();
+    }
+
+    public void ack(Object msgId) {
+    	if (spoutMsg == null) {
+            spoutMsg = new SpoutMsg();
+        }
+        spoutMsg.setCommand("ack");
+        spoutMsg.setId(msgId.toString());
+        querySubprocess();
+    }
+
+    public void fail(Object msgId) {
+    	if (spoutMsg == null) {
+            spoutMsg = new SpoutMsg();
+        }
+        spoutMsg.setCommand("fail");
+        spoutMsg.setId(msgId.toString());
+        querySubprocess();
+    }
+
+    private void querySubprocess() {
+        try {
+            _process.writeSpoutMsg(spoutMsg);
+
+            while (true) {
+                Emission emission = _process.readEmission();
+                String command = emission.getCommand();
+                if (command.equals("sync")) {
+                    return;
+                } else if (command.equals("log")) {
+                    String msg = emission.getMsg();
+                    LOG.info("Shell msg: " + msg);
+                } else if (command.equals("emit")) {
+                    String stream = emission.getStream();
+                    Long task = emission.getTask();
+                    List<Object> tuple = emission.getTuple();
+                    Object messageId = emission.getId();
+                    if (task == 0) {
+                        List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
+                        _process.writeTaskIds(outtasks);
+                    } else {
+                        _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
+                    }
+                }
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void activate() {
+    }
+
+    @Override
+    public void deactivate() {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
new file mode 100644
index 0000000..36585e3
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -0,0 +1,208 @@
+package backtype.storm.task;
+
+import backtype.storm.generated.ShellComponent;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.ShellProcess;
+import backtype.storm.multilang.Emission;
+import backtype.storm.multilang.ISerializer;
+import backtype.storm.multilang.Immission;
+import backtype.storm.multilang.JsonSerializer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import java.util.Map;
+import java.util.Random;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A bolt that shells out to another process to process tuples. ShellBolt
+ * communicates with that process over stdio using a special protocol. An ~100
+ * line library is required to implement that protocol, and adapter libraries
+ * currently exist for Ruby and Python.
+ *
+ * <p>To run a ShellBolt on a cluster, the scripts that are shelled out to must be
+ * in the resources directory within the jar submitted to the master.
+ * During development/testing on a local machine, that resources directory just
+ * needs to be on the classpath.</p>
+ *
+ * <p>When creating topologies using the Java API, subclass this bolt and implement
+ * the IRichBolt interface to create components for the topology that use other languages. For example:
+ * </p>
+ *
+ * <pre>
+ * public class MyBolt extends ShellBolt implements IRichBolt {
+ *      public MyBolt() {
+ *          super("python", "mybolt.py");
+ *      }
+ *
+ *      public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ *          declarer.declare(new Fields("field1", "field2"));
+ *      }
+ * }
+ * </pre>
+ */
+public class ShellBolt implements IBolt {
+    public static Logger LOG = LoggerFactory.getLogger(ShellBolt.class);
+    Process _subprocess;
+    OutputCollector _collector;
+    Map<String, Tuple> _inputs = new ConcurrentHashMap<String, Tuple>();
+
+    private String[] _command;
+    private ShellProcess _process;
+    private volatile boolean _running = true;
+    private volatile Throwable _exception;
+    private LinkedBlockingQueue _pendingWrites = new LinkedBlockingQueue();
+    private Random _rand;
+
+    private Thread _readerThread;
+    private Thread _writerThread;
+
+    public ShellBolt(ShellComponent component) {
+        this(component.get_execution_command(), component.get_script());
+        _process = new ShellProcess(new JsonSerializer(), _command);
+    }
+
+    public ShellBolt(String... command) {
+        _command = command;
+        _process = new ShellProcess(new JsonSerializer(), _command);
+    }
+
+    public ShellBolt(ISerializer serializer, String... command) {
+        _command = command;
+        _process = new ShellProcess(serializer, _command);
+    }
+
+    public void prepare(Map stormConf, TopologyContext context,
+                        final OutputCollector collector) {
+        _rand = new Random();
+        _collector = collector;
+
+        //subprocesses must send their pid first thing
+        Number subpid = _process.launch(stormConf, context);
+        LOG.info("Launched subprocess with pid " + subpid);
+
+        // reader
+        _readerThread = new Thread(new Runnable() {
+            public void run() {
+                while (_running) {
+                    try {
+                        Emission emission = _process.readEmission();
+
+                        String command = emission.getCommand();
+                        if(command.equals("ack")) {
+                            handleAck(emission.getId());
+                        } else if (command.equals("fail")) {
+                            handleFail(emission.getId());
+                        } else if (command.equals("error")) {
+                            handleError(emission.getMsg());
+                        } else if (command.equals("log")) {
+                            String msg = emission.getMsg();
+                            LOG.info("Shell msg: " + msg);
+                        } else if (command.equals("emit")) {
+                            handleEmit(emission);
+                        }
+                    } catch (InterruptedException e) {
+                    } catch (Throwable t) {
+                        die(t);
+                    }
+                }
+            }
+        });
+
+        _readerThread.start();
+
+        _writerThread = new Thread(new Runnable() {
+            public void run() {
+                while (_running) {
+                    try {
+                    	// FIXME: This can either be TaskIds or Immissions
+                        Object write = _pendingWrites.poll(1, SECONDS);
+                        if (write instanceof Immission) {
+                            _process.writeImmission((Immission)write);
+                        } else if (write instanceof List<?>) {
+
+                        }
+                    } catch (InterruptedException e) {
+                    } catch (Throwable t) {
+                        die(t);
+                    }
+                }
+            }
+        });
+
+        _writerThread.start();
+    }
+
+    public void execute(Tuple input) {
+        if (_exception != null) {
+            throw new RuntimeException(_exception);
+        }
+
+        //just need an id
+        String genId = Long.toString(_rand.nextLong());
+        _inputs.put(genId, input);
+        try {
+            Immission immission = new Immission();
+            immission.setId(genId);
+            immission.setComp(input.getSourceComponent());
+            immission.setStream(input.getSourceStreamId());
+            immission.setTask(input.getSourceTask());
+            immission.setTuple(input.getValues());
+
+            _pendingWrites.put(immission);
+        } catch(InterruptedException e) {
+            throw new RuntimeException("Error during multilang processing", e);
+        }
+    }
+
+    public void cleanup() {
+        _running = false;
+        _process.destroy();
+        _inputs.clear();
+    }
+
+    private void handleAck(String id) {
+        Tuple acked = _inputs.remove(id);
+        if(acked==null) {
+            throw new RuntimeException("Acked a non-existent or already acked/failed id: " + id);
+        }
+        _collector.ack(acked);
+    }
+
+    private void handleFail(String id) {
+        Tuple failed = _inputs.remove(id);
+        if(failed==null) {
+            throw new RuntimeException("Failed a non-existent or already acked/failed id: " + id);
+        }
+        _collector.fail(failed);
+    }
+
+    private void handleError(String msg) {
+        _collector.reportError(new Exception("Shell Process Exception: " + msg));
+    }
+
+    private void handleEmit(Emission emission) throws InterruptedException {
+    	List<Tuple> anchors = new ArrayList<Tuple>();
+    	for (String anchor : emission.getAnchors()) {
+	    	Tuple t = _inputs.get(anchor);
+	        if (t == null) {
+	            throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
+	        }
+	        anchors.add(t);
+    	}
+
+        if(emission.getTask() == 0) {
+            List<Integer> outtasks = _collector.emit(emission.getStream(), anchors, emission.getTuple());
+            _pendingWrites.put(outtasks);
+        } else {
+            _collector.emitDirect((int)emission.getTask(), emission.getStream(), anchors, emission.getTuple());
+        }
+    }
+
+    private void die(Throwable exception) {
+        _exception = exception;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/a7918f8e/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
new file mode 100644
index 0000000..38976bc
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -0,0 +1,106 @@
+package backtype.storm.utils;
+
+import backtype.storm.multilang.Emission;
+import backtype.storm.multilang.ISerializer;
+import backtype.storm.multilang.Immission;
+import backtype.storm.multilang.NoOutputException;
+import backtype.storm.multilang.SpoutMsg;
+import backtype.storm.task.TopologyContext;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.log4j.Logger;
+
+public class ShellProcess implements Serializable {
+    public static Logger LOG = Logger.getLogger(ShellProcess.class);
+    private Process _subprocess;
+    private InputStream processErrorStream;
+    private String[] command;
+    public ISerializer serializer;
+
+    public ShellProcess(ISerializer serializer, String[] command) {
+        this.command = command;
+        this.serializer = serializer;
+    }
+
+    public Number launch(Map conf, TopologyContext context) {
+        ProcessBuilder builder = new ProcessBuilder(command);
+        builder.directory(new File(context.getCodeDir()));
+
+        Number pid;
+        try {
+	        _subprocess = builder.start();
+	        processErrorStream = _subprocess.getErrorStream();
+	        serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
+	        pid = serializer.connect(conf, context);
+        } catch (IOException e) {
+        	throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e);
+        } catch (NoOutputException e) {
+            throw new RuntimeException(e + getErrorsString() + "\n");
+    	}
+        return pid;
+    }
+
+    public void destroy() {
+        _subprocess.destroy();
+    }
+
+    public Emission readEmission() throws IOException {
+    	try  {
+    		return serializer.readEmission();
+    	} catch (NoOutputException e) {
+            throw new RuntimeException(e + getErrorsString() + "\n");
+    	}
+    }
+
+    public void writeImmission(Immission msg) throws IOException {
+    	serializer.writeImmission(msg);
+        // drain the error stream to avoid dead lock because of full error stream buffer
+        drainErrorStream();
+    }
+
+    public void writeSpoutMsg(SpoutMsg msg) throws IOException {
+    	serializer.writeSpoutMsg(msg);
+        // drain the error stream to avoid dead lock because of full error stream buffer
+        drainErrorStream();
+    }
+
+    public void writeTaskIds(List<Integer> taskIds) throws IOException {
+    	serializer.writeTaskIds(taskIds);
+        // drain the error stream to avoid dead lock because of full error stream buffer
+        drainErrorStream();
+    }
+
+    public void drainErrorStream()
+    {
+        try {
+            while (processErrorStream.available() > 0)
+            {
+                int bufferSize = processErrorStream.available();
+                byte[] errorReadingBuffer =  new byte[bufferSize];
+
+                processErrorStream.read(errorReadingBuffer, 0, bufferSize);
+
+                LOG.info("Got error from shell process: " + new String(errorReadingBuffer));
+            }
+        } catch(Exception e) {
+        }
+    }
+
+    public String getErrorsString() {
+        if(processErrorStream!=null) {
+            try {
+                return IOUtils.toString(processErrorStream);
+            } catch(IOException e) {
+                return "(Unable to capture error stream)";
+            }
+        } else {
+            return "";
+        }
+    }
+}