You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/05/19 23:52:30 UTC

[05/24] git commit: multilang serializer is now handled by stormConf. Renamed Emission and Immission to ShellMsg and BoltMsg respectively. Documented ISerializer interface.

multilang serializer is now handled by stormConf. Renamed Emission and Immission to ShellMsg and BoltMsg respectively. Documented ISerializer interface.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/ee0678c7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/ee0678c7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/ee0678c7

Branch: refs/heads/master
Commit: ee0678c7f8bed531a8514166c7254a2190f3c4a9
Parents: 3701bfc
Author: John Gilmore <jg...@ml.sun.ac.za>
Authored: Tue Oct 8 13:52:00 2013 +0200
Committer: John Gilmore <jg...@ml.sun.ac.za>
Committed: Tue Oct 8 13:53:32 2013 +0200

----------------------------------------------------------------------
 conf/defaults.yaml                              |   1 +
 storm-core/src/jvm/backtype/storm/Config.java   | 129 +++++++++--------
 .../jvm/backtype/storm/multilang/BoltMsg.java   |  63 +++++++++
 .../jvm/backtype/storm/multilang/Emission.java  |  79 -----------
 .../backtype/storm/multilang/ISerializer.java   |  59 ++++++--
 .../jvm/backtype/storm/multilang/Immission.java |  52 -------
 .../storm/multilang/JsonSerializer.java         | 138 ++++++++++---------
 .../storm/multilang/NoOutputException.java      |  19 ++-
 .../jvm/backtype/storm/multilang/ShellMsg.java  |  96 +++++++++++++
 .../jvm/backtype/storm/multilang/SpoutMsg.java  |  44 +++---
 .../jvm/backtype/storm/spout/ShellSpout.java    |  30 ++--
 .../src/jvm/backtype/storm/task/ShellBolt.java  |  65 ++++-----
 .../jvm/backtype/storm/utils/ShellProcess.java  |  95 ++++++++-----
 13 files changed, 489 insertions(+), 381 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index b17e445..92e65d6 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -19,6 +19,7 @@ storm.cluster.mode: "distributed" # can be distributed or local
 storm.local.mode.zmq: false
 storm.thrift.transport: "backtype.storm.security.auth.SimpleTransportPlugin"
 storm.messaging.transport: "backtype.storm.messaging.zmq"
+storm.multilang.serializer: "backtype.storm.multilang.JsonSerializer"
 
 ### nimbus.* configs are for the master
 nimbus.host: "localhost"

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index 564ad0d..fff1573 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -10,11 +10,11 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Topology configs are specified as a plain old map. This class provides a 
- * convenient way to create a topology config map by providing setter methods for 
- * all the configs that can be set. It also makes it easier to do things like add 
+ * Topology configs are specified as a plain old map. This class provides a
+ * convenient way to create a topology config map by providing setter methods for
+ * all the configs that can be set. It also makes it easier to do things like add
  * serializations.
- * 
+ *
  * <p>This class also provides constants for all the configurations possible on
  * a Storm cluster and Storm topology. Each constant is paired with a schema
  * that defines the validity criterion of the corresponding field. Default
@@ -22,11 +22,18 @@ import java.util.Map;
  *
  * <p>Note that you may put other configurations in any of the configs. Storm
  * will ignore anything it doesn't recognize, but your topologies are free to make
- * use of them by reading them in the prepare method of Bolts or the open method of 
+ * use of them by reading them in the prepare method of Bolts or the open method of
  * Spouts.</p>
  */
 public class Config extends HashMap<String, Object> {
     /**
+     * The serializer for communication between shell components and non-JVM
+     * processes
+     */
+    public static final String STORM_MULTILANG_SERIALIZER = "storm.multilang.serializer";
+    public static final Object STORM_MULTILANG_SERIALIZER_SCHEMA = String.class;
+
+    /**
      * The transporter for communication among Storm tasks
      */
     public static final String STORM_MESSAGING_TRANSPORT = "storm.messaging.transport";
@@ -35,25 +42,25 @@ public class Config extends HashMap<String, Object> {
     /**
      * Netty based messaging: The buffer size for send/recv buffer
      */
-    public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size"; 
+    public static final String STORM_MESSAGING_NETTY_BUFFER_SIZE = "storm.messaging.netty.buffer_size";
     public static final Object STORM_MESSAGING_NETTY_BUFFER_SIZE_SCHEMA = Number.class;
 
     /**
      * Netty based messaging: The max # of retries that a peer will perform when a remote is not accessible
      */
-    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries"; 
+    public static final String STORM_MESSAGING_NETTY_MAX_RETRIES = "storm.messaging.netty.max_retries";
     public static final Object STORM_MESSAGING_NETTY_MAX_RETRIES_SCHEMA = Number.class;
 
     /**
-     * Netty based messaging: The min # of milliseconds that a peer will wait. 
+     * Netty based messaging: The min # of milliseconds that a peer will wait.
      */
-    public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms"; 
+    public static final String STORM_MESSAGING_NETTY_MIN_SLEEP_MS = "storm.messaging.netty.min_wait_ms";
     public static final Object STORM_MESSAGING_NETTY_MIN_SLEEP_MS_SCHEMA = Number.class;
 
     /**
-     * Netty based messaging: The max # of milliseconds that a peer will wait. 
+     * Netty based messaging: The max # of milliseconds that a peer will wait.
      */
-    public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms"; 
+    public static final String STORM_MESSAGING_NETTY_MAX_SLEEP_MS = "storm.messaging.netty.max_wait_ms";
     public static final Object STORM_MESSAGING_NETTY_MAX_SLEEP_MS_SCHEMA = Number.class;
 
     /**
@@ -78,7 +85,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A global task scheduler used to assign topologies's tasks to supervisors' wokers.
-     * 
+     *
      * If this is not set, a default system scheduler will be used.
      */
     public static final String STORM_SCHEDULER = "storm.scheduler";
@@ -91,9 +98,9 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
     /**
-     * The hostname the supervisors/workers should report to nimbus. If unset, Storm will 
+     * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
      * get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
-     * 
+     *
      * You should set this config when you dont have a DNS which supervisors/workers
      * can utilize to find each other based on hostname got from calls to
      * <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
@@ -108,16 +115,16 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class;
 
     /**
-     * The serializer class for ListDelegate (tuple payload). 
+     * The serializer class for ListDelegate (tuple payload).
      * The default serializer will be ListDelegateSerializer
      */
     public static final String TOPOLOGY_TUPLE_SERIALIZER = "topology.tuple.serializer";
     public static final Object TOPOLOGY_TUPLE_SERIALIZER_SCHEMA = String.class;
 
     /**
-     * Whether or not to use ZeroMQ for messaging in local mode. If this is set 
-     * to false, then Storm will use a pure-Java messaging system. The purpose 
-     * of this flag is to make it easy to run Storm in local mode by eliminating 
+     * Whether or not to use ZeroMQ for messaging in local mode. If this is set
+     * to false, then Storm will use a pure-Java messaging system. The purpose
+     * of this flag is to make it easy to run Storm in local mode by eliminating
      * the need for native dependencies, which can be difficult to install.
      *
      * Defaults to false.
@@ -255,7 +262,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_TASK_LAUNCH_SECS_SCHEMA = Number.class;
 
     /**
-     * Whether or not nimbus should reassign tasks if it detects that a task goes down. 
+     * Whether or not nimbus should reassign tasks if it detects that a task goes down.
      * Defaults to true, and it's not recommended to change this value.
      */
     public static final String NIMBUS_REASSIGN = "nimbus.reassign";
@@ -319,19 +326,19 @@ public class Config extends HashMap<String, Object> {
     public static final Object DRPC_PORT_SCHEMA = Number.class;
 
     /**
-     * DRPC thrift server worker threads 
+     * DRPC thrift server worker threads
      */
     public static final String DRPC_WORKER_THREADS = "drpc.worker.threads";
     public static final Object DRPC_WORKER_THREADS_SCHEMA = Number.class;
 
     /**
-     * DRPC thrift server queue size 
+     * DRPC thrift server queue size
      */
     public static final String DRPC_QUEUE_SIZE = "drpc.queue.size";
     public static final Object DRPC_QUEUE_SIZE_SCHEMA = Number.class;
 
     /**
-     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back. 
+     * This port on Storm DRPC is used by DRPC topologies to receive function invocations and send results back.
      */
     public static final String DRPC_INVOCATIONS_PORT = "drpc.invocations.port";
     public static final Object DRPC_INVOCATIONS_PORT_SCHEMA = Number.class;
@@ -479,8 +486,8 @@ public class Config extends HashMap<String, Object> {
     /**
      * How many instances to create for a spout/bolt. A task runs on a thread with zero or more
      * other tasks for the same spout/bolt. The number of tasks for a spout/bolt is always
-     * the same throughout the lifetime of a topology, but the number of executors (threads) for 
-     * a spout/bolt can change over time. This allows a topology to scale to more or less resources 
+     * the same throughout the lifetime of a topology, but the number of executors (threads) for
+     * a spout/bolt can change over time. This allows a topology to scale to more or less resources
      * without redeploying the topology or violating the constraints of Storm (such as a fields grouping
      * guaranteeing that the same value goes to the same task).
      */
@@ -519,8 +526,8 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of classes that customize storm's kryo instance during start-up.
-     * Each listed class name must implement IKryoDecorator. During start-up the 
-     * listed class is instantiated with 0 arguments, then its 'decorate' method 
+     * Each listed class name must implement IKryoDecorator. During start-up the
+     * listed class is instantiated with 0 arguments, then its 'decorate' method
      * is called with storm's kryo instance as the only argument.
      */
     public static final String TOPOLOGY_KRYO_DECORATORS = "topology.kryo.decorators";
@@ -550,7 +557,7 @@ public class Config extends HashMap<String, Object> {
 
     /*
      * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
-     * Each listed class will be routed all the metrics data generated by the storm metrics API. 
+     * Each listed class will be routed all the metrics data generated by the storm metrics API.
      * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
      */
     public static final String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
@@ -566,24 +573,24 @@ public class Config extends HashMap<String, Object> {
 
 
     /**
-     * The maximum number of tuples that can be pending on a spout task at any given time. 
-     * This config applies to individual tasks, not to spouts or topologies as a whole. 
-     * 
+     * The maximum number of tuples that can be pending on a spout task at any given time.
+     * This config applies to individual tasks, not to spouts or topologies as a whole.
+     *
      * A pending tuple is one that has been emitted from a spout but has not been acked or failed yet.
-     * Note that this config parameter has no effect for unreliable spouts that don't tag 
+     * Note that this config parameter has no effect for unreliable spouts that don't tag
      * their tuples with a message id.
      */
-    public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending"; 
+    public static final String TOPOLOGY_MAX_SPOUT_PENDING="topology.max.spout.pending";
     public static final Object TOPOLOGY_MAX_SPOUT_PENDING_SCHEMA = Number.class;
 
     /**
      * A class that implements a strategy for what to do when a spout needs to wait. Waiting is
      * triggered in one of two conditions:
-     * 
+     *
      * 1. nextTuple emits no tuples
      * 2. The spout has hit maxSpoutPending and can't emit any more tuples
      */
-    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy"; 
+    public static final String TOPOLOGY_SPOUT_WAIT_STRATEGY="topology.spout.wait.strategy";
     public static final Object TOPOLOGY_SPOUT_WAIT_STRATEGY_SCHEMA = String.class;
 
     /**
@@ -606,7 +613,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_STATS_SAMPLE_RATE_SCHEMA = Number.class;
 
     /**
-     * The time period that builtin metrics data in bucketed into. 
+     * The time period that builtin metrics data in bucketed into.
      */
     public static final String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
     public static final Object TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS_SCHEMA = Number.class;
@@ -633,7 +640,7 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * A list of task hooks that are automatically added to every spout and bolt in the topology. An example
-     * of when you'd do this is to add a hook that integrates with your internal 
+     * of when you'd do this is to add a hook that integrates with your internal
      * monitoring system. These hooks are instantiated using the zero-arg constructor.
      */
     public static final String TOPOLOGY_AUTO_TASK_HOOKS="topology.auto.task.hooks";
@@ -647,7 +654,7 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE_SCHEMA = ConfigValidation.PowerOf2Validator;
 
     /**
-     * The maximum number of messages to batch from the thread receiving off the network to the 
+     * The maximum number of messages to batch from the thread receiving off the network to the
      * executor queues. Must be a power of 2.
      */
     public static final String TOPOLOGY_RECEIVER_BUFFER_SIZE="topology.receiver.buffer.size";
@@ -681,14 +688,14 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_DISRUPTOR_WAIT_STRATEGY_SCHEMA = String.class;
 
    /**
-    * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed 
+    * The size of the shared thread pool for worker tasks to make use of. The thread pool can be accessed
     * via the TopologyContext.
     */
     public static final String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";
     public static final Object TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE_SCHEMA = Number.class;
 
     /**
-     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example, 
+     * The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
      * an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
      * reported to Zookeeper per task for every 10 second interval of time.
      */
@@ -757,9 +764,9 @@ public class Config extends HashMap<String, Object> {
 
     /**
      * This value is passed to spawned JVMs (e.g., Nimbus, Supervisor, and Workers)
-     * for the java.library.path value. java.library.path tells the JVM where 
+     * for the java.library.path value. java.library.path tells the JVM where
      * to look for native libraries. It is necessary to set this config correctly since
-     * Storm uses the ZeroMQ and JZMQ native libs. 
+     * Storm uses the ZeroMQ and JZMQ native libs.
      */
     public static final String JAVA_LIBRARY_PATH = "java.library.path";
     public static final Object JAVA_LIBRARY_PATH_SCHEMA = String.class;
@@ -781,17 +788,17 @@ public class Config extends HashMap<String, Object> {
 
     public static void setDebug(Map conf, boolean isOn) {
         conf.put(Config.TOPOLOGY_DEBUG, isOn);
-    } 
+    }
 
     public void setDebug(boolean isOn) {
         setDebug(this, isOn);
     }
-    
+
     @Deprecated
     public void setOptimize(boolean isOn) {
         put(Config.TOPOLOGY_OPTIMIZE, isOn);
-    } 
-    
+    }
+
     public static void setNumWorkers(Map conf, int workers) {
         conf.put(Config.TOPOLOGY_WORKERS, workers);
     }
@@ -807,7 +814,7 @@ public class Config extends HashMap<String, Object> {
     public void setNumAckers(int numExecutors) {
         setNumAckers(this, numExecutors);
     }
-    
+
     public static void setMessageTimeoutSecs(Map conf, int secs) {
         conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, secs);
     }
@@ -815,7 +822,7 @@ public class Config extends HashMap<String, Object> {
     public void setMessageTimeoutSecs(int secs) {
         setMessageTimeoutSecs(this, secs);
     }
-    
+
     public static void registerSerialization(Map conf, Class klass) {
         getRegisteredSerializations(conf).add(klass.getName());
     }
@@ -823,17 +830,17 @@ public class Config extends HashMap<String, Object> {
     public void registerSerialization(Class klass) {
         registerSerialization(this, klass);
     }
-    
+
     public static void registerSerialization(Map conf, Class klass, Class<? extends Serializer> serializerClass) {
         Map<String, String> register = new HashMap<String, String>();
         register.put(klass.getName(), serializerClass.getName());
-        getRegisteredSerializations(conf).add(register);        
+        getRegisteredSerializations(conf).add(register);
     }
 
     public void registerSerialization(Class klass, Class<? extends Serializer> serializerClass) {
         registerSerialization(this, klass, serializerClass);
     }
-    
+
     public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
         HashMap m = new HashMap();
         m.put("class", klass.getCanonicalName());
@@ -861,7 +868,7 @@ public class Config extends HashMap<String, Object> {
     public void registerDecorator(Class<? extends IKryoDecorator> klass) {
         registerDecorator(this, klass);
     }
-    
+
     public static void setKryoFactory(Map conf, Class<? extends IKryoFactory> klass) {
         conf.put(Config.TOPOLOGY_KRYO_FACTORY, klass.getName());
     }
@@ -877,7 +884,7 @@ public class Config extends HashMap<String, Object> {
     public void setSkipMissingKryoRegistrations(boolean skip) {
        setSkipMissingKryoRegistrations(this, skip);
     }
-    
+
     public static void setMaxTaskParallelism(Map conf, int max) {
         conf.put(Config.TOPOLOGY_MAX_TASK_PARALLELISM, max);
     }
@@ -885,7 +892,7 @@ public class Config extends HashMap<String, Object> {
     public void setMaxTaskParallelism(int max) {
         setMaxTaskParallelism(this, max);
     }
-    
+
     public static void setMaxSpoutPending(Map conf, int max) {
         conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, max);
     }
@@ -893,23 +900,23 @@ public class Config extends HashMap<String, Object> {
     public void setMaxSpoutPending(int max) {
         setMaxSpoutPending(this, max);
     }
-    
+
     public static void setStatsSampleRate(Map conf, double rate) {
         conf.put(Config.TOPOLOGY_STATS_SAMPLE_RATE, rate);
-    }    
+    }
 
     public void setStatsSampleRate(double rate) {
         setStatsSampleRate(this, rate);
-    }    
+    }
 
     public static void setFallBackOnJavaSerialization(Map conf, boolean fallback) {
         conf.put(Config.TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION, fallback);
-    }    
+    }
 
     public void setFallBackOnJavaSerialization(boolean fallback) {
         setFallBackOnJavaSerialization(this, fallback);
-    }    
-    
+    }
+
     private static List getRegisteredSerializations(Map conf) {
         List ret;
         if(!conf.containsKey(Config.TOPOLOGY_KRYO_REGISTER)) {
@@ -920,13 +927,13 @@ public class Config extends HashMap<String, Object> {
         conf.put(Config.TOPOLOGY_KRYO_REGISTER, ret);
         return ret;
     }
-    
+
     private static List getRegisteredDecorators(Map conf) {
         List ret;
         if(!conf.containsKey(Config.TOPOLOGY_KRYO_DECORATORS)) {
             ret = new ArrayList();
         } else {
-            ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));            
+            ret = new ArrayList((List) conf.get(Config.TOPOLOGY_KRYO_DECORATORS));
         }
         conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
         return ret;

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/BoltMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/BoltMsg.java b/storm-core/src/jvm/backtype/storm/multilang/BoltMsg.java
new file mode 100644
index 0000000..1d6bd1d
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/multilang/BoltMsg.java
@@ -0,0 +1,63 @@
+package backtype.storm.multilang;
+
+import java.util.List;
+
+/**
+ * BoltMsg is an object that represents the data sent from a shell component to
+ * a bolt process that implements a multi-language protocol. It is the union of
+ * all data types that a bolt can receive from Storm.
+ *
+ * <p>
+ * BoltMsgs are objects sent to the ISerializer interface, for serialization
+ * according to the wire protocol implemented by the serializer. The BoltMsg
+ * class allows for a decoupling between the serialized representation of the
+ * data and the data itself.
+ * </p>
+ */
+public class BoltMsg {
+    private String id;
+    private String comp;
+    private String stream;
+    private long task;
+    private List<Object> tuple;
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public String getComp() {
+        return comp;
+    }
+
+    public void setComp(String comp) {
+        this.comp = comp;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public long getTask() {
+        return task;
+    }
+
+    public void setTask(long task) {
+        this.task = task;
+    }
+
+    public List<Object> getTuple() {
+        return tuple;
+    }
+
+    public void setTuple(List<Object> tuple) {
+        this.tuple = tuple;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/Emission.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/Emission.java b/storm-core/src/jvm/backtype/storm/multilang/Emission.java
deleted file mode 100644
index d477d8b..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/Emission.java
+++ /dev/null
@@ -1,79 +0,0 @@
-package backtype.storm.multilang;
-
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Emission is an object that represents the data sent to a shell component
- * from a process that implements a multi-language protocol. It is the union
- * of all data types that a component can send to Storm.
- *
- * <p> Emissions are objects received from the ISerializer interface, after
- * the serializer has deserialized the data from the underlying wire protocol.
- * The Emission class allows for a decoupling between the serialized
- * representation of the data and the data itself.</p>
- */
-public class Emission {
-	private String command;
-	private String id;
-	private List<String> anchors;
-	private String stream;
-	private long task;
-	private String msg;
-	private List<Object> tuple;
-
-	public String getCommand() {
-		return command;
-	}
-	public void setCommand(String command) {
-		this.command = command;
-	}
-	public String getId() {
-		return id;
-	}
-	public void setId(String id) {
-		this.id = id;
-	}
-	public List<String> getAnchors() {
-		return anchors;
-	}
-	public void setAnchors(List<String> anchors) {
-		this.anchors = anchors;
-	}
-	public void addAnchor(String anchor) {
-		if (anchors == null) {
-			anchors = new ArrayList<String>();
-		}
-		this.anchors.add(anchor);
-	}
-	public String getStream() {
-		return stream;
-	}
-	public void setStream(String stream) {
-		this.stream = stream;
-	}
-	public long getTask() {
-		return task;
-	}
-	public void setTask(long task) {
-		this.task = task;
-	}
-	public String getMsg() {
-		return msg;
-	}
-	public void setMsg(String msg) {
-		this.msg = msg;
-	}
-	public List<Object> getTuple() {
-		return tuple;
-	}
-	public void setTuple(List<Object> tuple) {
-		this.tuple = tuple;
-	}
-	public void addTuple(Object tuple) {
-		if (this.tuple == null) {
-			this.tuple = new ArrayList<Object>();
-		}
-		this.tuple.add(tuple);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/ISerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ISerializer.java b/storm-core/src/jvm/backtype/storm/multilang/ISerializer.java
index 691b432..a6ccfd4 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/ISerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/ISerializer.java
@@ -10,15 +10,56 @@ import java.util.Map;
 import backtype.storm.task.TopologyContext;
 
 /**
- * The ISerializer interface describes the methods that an object should implement
- * to provide serialization and de-serialization capabilities to non-JVM
- * language components.
+ * The ISerializer interface describes the methods that an object should
+ * implement to provide serialization and de-serialization capabilities to
+ * non-JVM language components.
  */
 public interface ISerializer extends Serializable {
-	void initialize (OutputStream processIn, InputStream processOut);
-	Number connect (Map conf, TopologyContext context) throws IOException, NoOutputException;
-	Emission readEmission() throws IOException, NoOutputException;
-	void writeImmission(Immission immission) throws IOException;
-	void writeSpoutMsg(SpoutMsg msg) throws IOException;
-	void writeTaskIds(List<Integer> taskIds) throws IOException;
+
+    /**
+     * This method sets the input and output streams of the serializer
+     *
+     * @param processIn output stream to non-JVM component
+     * @param processOut input stream from non-JVM component
+     */
+    void initialize(OutputStream processIn, InputStream processOut);
+
+    /**
+     * This method transmits the Storm config to the non-JVM process and
+     * receives its pid.
+     *
+     * @param conf storm configuration
+     * @param context topology context
+     * @return process pid
+     */
+    Number connect(Map conf, TopologyContext context) throws IOException,
+            NoOutputException;
+
+    /**
+     * This method receives a shell message from the non-JVM process
+     *
+     * @return shell message
+     */
+    ShellMsg readShellMsg() throws IOException, NoOutputException;
+
+    /**
+     * This method sends a bolt message to a non-JVM bolt process
+     *
+     * @param msg bolt message
+     */
+    void writeBoltMsg(BoltMsg msg) throws IOException;
+
+    /**
+     * This method sends a spout message to a non-JVM spout process
+     *
+     * @param msg spout message
+     */
+    void writeSpoutMsg(SpoutMsg msg) throws IOException;
+
+    /**
+     * This method sends a list of task IDs to a non-JVM bolt process
+     *
+     * @param taskIds list of task IDs
+     */
+    void writeTaskIds(List<Integer> taskIds) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/Immission.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/Immission.java b/storm-core/src/jvm/backtype/storm/multilang/Immission.java
deleted file mode 100644
index 968f728..0000000
--- a/storm-core/src/jvm/backtype/storm/multilang/Immission.java
+++ /dev/null
@@ -1,52 +0,0 @@
-package backtype.storm.multilang;
-
-import java.util.List;
-
-/**
- * Immission is an object that represents the data sent from a shell component
- * to a process that implements a multi-language protocol. It is the union
- * of all data types that a component can receive from Storm.
- *
- * <p> Immissions are objects sent to the ISerializer interface, for
- * serialization according to the wire protocol implemented by the serializer.
- * The Immission class allows for a decoupling between the serialized
- * representation of the data and the data itself.</p>
- */
-public class Immission {
-	private String id;
-	private String comp;
-	private String stream;
-	private long task;
-	private List<Object> tuple;
-
-	public String getId() {
-		return id;
-	}
-	public void setId(String id) {
-		this.id = id;
-	}
-	public String getComp() {
-		return comp;
-	}
-	public void setComp(String comp) {
-		this.comp = comp;
-	}
-	public String getStream() {
-		return stream;
-	}
-	public void setStream(String stream) {
-		this.stream = stream;
-	}
-	public long getTask() {
-		return task;
-	}
-	public void setTask(long task) {
-		this.task = task;
-	}
-	public List<Object> getTuple() {
-		return tuple;
-	}
-	public void setTuple(List<Object> tuple) {
-		this.tuple = tuple;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
index ba574e5..ceb8cb3 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/JsonSerializer.java
@@ -22,126 +22,128 @@ import backtype.storm.utils.Utils;
  * JsonSerializer implements the JSON multilang protocol.
  */
 public class JsonSerializer implements ISerializer {
-	private DataOutputStream processIn;
+    private DataOutputStream processIn;
     private BufferedReader processOut;
 
-	public void initialize (OutputStream processIn, InputStream processOut) {
-		this.processIn = new DataOutputStream(processIn);;
-		this.processOut = new BufferedReader(new InputStreamReader(processOut));
-	}
+    public void initialize(OutputStream processIn, InputStream processOut) {
+        this.processIn = new DataOutputStream(processIn);
+        this.processOut = new BufferedReader(new InputStreamReader(processOut));
+    }
 
-	public Number connect (Map conf, TopologyContext context) throws IOException, NoOutputException {
-		JSONObject setupInfo = new JSONObject();
+    public Number connect(Map conf, TopologyContext context)
+            throws IOException, NoOutputException {
+        JSONObject setupInfo = new JSONObject();
         setupInfo.put("pidDir", context.getPIDDir());
         setupInfo.put("conf", conf);
         setupInfo.put("context", context);
-    	writeMessage(setupInfo);
+        writeMessage(setupInfo);
 
-    	Number pid = (Number)((JSONObject)readMessage()).get("pid");
+        Number pid = (Number) ((JSONObject) readMessage()).get("pid");
         return pid;
-	}
-
-	public void writeImmission(Immission immission) throws IOException {
-		JSONObject obj = new JSONObject();
-        obj.put("id", immission.getId());
-        obj.put("comp", immission.getComp());
-        obj.put("stream", immission.getStream());
-        obj.put("task", immission.getTask());
-        obj.put("tuple", immission.getTuple());
+    }
+
+    public void writeBoltMsg(BoltMsg boltMsg) throws IOException {
+        JSONObject obj = new JSONObject();
+        obj.put("id", boltMsg.getId());
+        obj.put("comp", boltMsg.getComp());
+        obj.put("stream", boltMsg.getStream());
+        obj.put("task", boltMsg.getTask());
+        obj.put("tuple", boltMsg.getTuple());
         writeMessage(obj);
-	}
+    }
 
-	public void writeSpoutMsg(SpoutMsg msg) throws IOException {
-		JSONObject obj = new JSONObject();
+    public void writeSpoutMsg(SpoutMsg msg) throws IOException {
+        JSONObject obj = new JSONObject();
         obj.put("command", msg.getCommand());
         obj.put("id", msg.getId());
         writeMessage(obj);
-	}
+    }
 
-	public void writeTaskIds(List<Integer> taskIds) throws IOException {
+    public void writeTaskIds(List<Integer> taskIds) throws IOException {
         writeMessage(taskIds);
-	}
+    }
 
-	private void writeMessage(Object msg) throws IOException {
+    private void writeMessage(Object msg) throws IOException {
         writeString(JSONValue.toJSONString(msg));
     }
 
-	private void writeString(String str) throws IOException {
+    private void writeString(String str) throws IOException {
         byte[] strBytes = str.getBytes("UTF-8");
         processIn.write(strBytes, 0, strBytes.length);
         processIn.writeBytes("\nend\n");
         processIn.flush();
     }
 
-	public Emission readEmission() throws IOException, NoOutputException {
-        JSONObject msg = (JSONObject)readMessage();
-    	Emission emission = new Emission();
+    public ShellMsg readShellMsg() throws IOException, NoOutputException {
+        JSONObject msg = (JSONObject) readMessage();
+        ShellMsg shellMsg = new ShellMsg();
 
-    	String command = (String) msg.get("command");
-        emission.setCommand(command);
+        String command = (String) msg.get("command");
+        shellMsg.setCommand(command);
 
-    	String stream = (String) msg.get("stream");
-        if(stream == null) stream = Utils.DEFAULT_STREAM_ID;
-        emission.setStream(stream);
+        String stream = (String) msg.get("stream");
+        if (stream == null)
+            stream = Utils.DEFAULT_STREAM_ID;
+        shellMsg.setStream(stream);
 
         Object taskObj = msg.get("task");
         if (taskObj != null) {
-        	emission.setTask((Long) taskObj);
+            shellMsg.setTask((Long) taskObj);
         }
 
-        emission.setTuple((List) msg.get("tuple"));
+        shellMsg.setTuple((List) msg.get("tuple"));
 
         List<Tuple> anchors = new ArrayList<Tuple>();
         Object anchorObj = msg.get("anchors");
-        if(anchorObj!=null) {
-            if(anchorObj instanceof String) {
+        if (anchorObj != null) {
+            if (anchorObj instanceof String) {
                 anchorObj = Arrays.asList(anchorObj);
             }
-            for(Object o: (List) anchorObj) {
-                emission.addAnchor((String)o);
+            for (Object o : (List) anchorObj) {
+                shellMsg.addAnchor((String) o);
             }
         }
 
-        return emission;
+        return shellMsg;
     }
 
-	private Object readMessage() throws IOException, NoOutputException {
-		String string = readString();
+    private Object readMessage() throws IOException, NoOutputException {
+        String string = readString();
         Object msg = JSONValue.parse(string);
         if (msg != null) {
-        	return msg;
+            return msg;
         } else {
-        	throw new IOException("unable to parse: " + string);
+            throw new IOException("unable to parse: " + string);
         }
-	}
+    }
 
     private String readString() throws IOException, NoOutputException {
         StringBuilder line = new StringBuilder();
 
-        //synchronized (processOut) {
-            while (true) {
-                String subline = processOut.readLine();
-                if(subline==null) {
-                    StringBuilder errorMessage = new StringBuilder();
-                    errorMessage.append("Pipe to subprocess seems to be broken!");
-                    if (line.length() == 0) {
-                        errorMessage.append(" No output read.\n");
-                    }
-                    else {
-                        errorMessage.append(" Currently read output: " + line.toString() + "\n");
-                    }
-                    errorMessage.append("Serializer Exception:\n");
-                    throw new NoOutputException(errorMessage.toString());
-                }
-                if(subline.equals("end")) {
-                    break;
-                }
-                if(line.length()!=0) {
-                    line.append("\n");
+        // synchronized (processOut) {
+        while (true) {
+            String subline = processOut.readLine();
+            if (subline == null) {
+                StringBuilder errorMessage = new StringBuilder();
+                errorMessage.append("Pipe to subprocess seems to be broken!");
+                if (line.length() == 0) {
+                    errorMessage.append(" No output read.\n");
+                } else {
+                    errorMessage.append(" Currently read output: "
+                            + line.toString() + "\n");
                 }
-                line.append(subline);
+                errorMessage.append("Serializer Exception:\n");
+                throw new NoOutputException(errorMessage.toString());
             }
-        //}
+            if (subline.equals("end")) {
+                break;
+            }
+            if (line.length() != 0) {
+                line.append("\n");
+            }
+            line.append(subline);
+        }
+        // }
         return line.toString();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/NoOutputException.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/NoOutputException.java b/storm-core/src/jvm/backtype/storm/multilang/NoOutputException.java
index 8ffb7dd..fd5ef72 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/NoOutputException.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/NoOutputException.java
@@ -5,8 +5,19 @@ package backtype.storm.multilang;
  * non-JVM process.
  */
 public class NoOutputException extends Exception {
-	public NoOutputException() { super(); }
-	public NoOutputException(String message) { super(message); }
-	public NoOutputException(String message, Throwable cause) { super(message, cause); }
-	public NoOutputException(Throwable cause) { super(cause); }
+    public NoOutputException() {
+        super();
+    }
+
+    public NoOutputException(String message) {
+        super(message);
+    }
+
+    public NoOutputException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public NoOutputException(Throwable cause) {
+        super(cause);
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
new file mode 100644
index 0000000..d56ba78
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/multilang/ShellMsg.java
@@ -0,0 +1,96 @@
+package backtype.storm.multilang;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * ShellMsg is an object that represents the data sent to a shell component from
+ * a process that implements a multi-language protocol. It is the union of all
+ * data types that a component can send to Storm.
+ *
+ * <p>
+ * ShellMsgs are objects received from the ISerializer interface, after the
+ * serializer has deserialized the data from the underlying wire protocol. The
+ * ShellMsg class allows for a decoupling between the serialized representation
+ * of the data and the data itself.
+ * </p>
+ */
+public class ShellMsg {
+    private String command;
+    private String id;
+    private List<String> anchors;
+    private String stream;
+    private long task;
+    private String msg;
+    private List<Object> tuple;
+
+    public String getCommand() {
+        return command;
+    }
+
+    public void setCommand(String command) {
+        this.command = command;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
+
+    public List<String> getAnchors() {
+        return anchors;
+    }
+
+    public void setAnchors(List<String> anchors) {
+        this.anchors = anchors;
+    }
+
+    public void addAnchor(String anchor) {
+        if (anchors == null) {
+            anchors = new ArrayList<String>();
+        }
+        this.anchors.add(anchor);
+    }
+
+    public String getStream() {
+        return stream;
+    }
+
+    public void setStream(String stream) {
+        this.stream = stream;
+    }
+
+    public long getTask() {
+        return task;
+    }
+
+    public void setTask(long task) {
+        this.task = task;
+    }
+
+    public String getMsg() {
+        return msg;
+    }
+
+    public void setMsg(String msg) {
+        this.msg = msg;
+    }
+
+    public List<Object> getTuple() {
+        return tuple;
+    }
+
+    public void setTuple(List<Object> tuple) {
+        this.tuple = tuple;
+    }
+
+    public void addTuple(Object tuple) {
+        if (this.tuple == null) {
+            this.tuple = new ArrayList<Object>();
+        }
+        this.tuple.add(tuple);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/multilang/SpoutMsg.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/multilang/SpoutMsg.java b/storm-core/src/jvm/backtype/storm/multilang/SpoutMsg.java
index c8063eb..9b5ffce 100644
--- a/storm-core/src/jvm/backtype/storm/multilang/SpoutMsg.java
+++ b/storm-core/src/jvm/backtype/storm/multilang/SpoutMsg.java
@@ -1,28 +1,34 @@
 package backtype.storm.multilang;
 
 /**
- * SpoutMsg is an object that represents the data sent from a shell spout
- * to a process that implements a multi-language spout. The SpoutMsg is used
- * to send a "next", "ack" or "fail" message to a spout.
+ * SpoutMsg is an object that represents the data sent from a shell spout to a
+ * process that implements a multi-language spout. The SpoutMsg is used to send
+ * a "next", "ack" or "fail" message to a spout.
  *
- * <p> Spout messages are objects sent to the ISerializer interface, for
+ * <p>
+ * Spout messages are objects sent to the ISerializer interface, for
  * serialization according to the wire protocol implemented by the serializer.
  * The SpoutMsg class allows for a decoupling between the serialized
- * representation of the data and the data itself.</p>
+ * representation of the data and the data itself.
+ * </p>
  */
 public class SpoutMsg {
-	private String command;
-	private String id;
-	public String getCommand() {
-		return command;
-	}
-	public void setCommand(String command) {
-		this.command = command;
-	}
-	public String getId() {
-		return id;
-	}
-	public void setId(String id) {
-		this.id = id;
-	}
+    private String command;
+    private String id;
+
+    public String getCommand() {
+        return command;
+    }
+
+    public void setCommand(String command) {
+        this.command = command;
+    }
+
+    public String getId() {
+        return id;
+    }
+
+    public void setId(String id) {
+        this.id = id;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
index c9ef682..72cebe2 100644
--- a/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
+++ b/storm-core/src/jvm/backtype/storm/spout/ShellSpout.java
@@ -1,13 +1,10 @@
 package backtype.storm.spout;
 
 import backtype.storm.generated.ShellComponent;
-import backtype.storm.multilang.Emission;
-import backtype.storm.multilang.ISerializer;
-import backtype.storm.multilang.JsonSerializer;
+import backtype.storm.multilang.ShellMsg;
 import backtype.storm.multilang.SpoutMsg;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.utils.ShellProcess;
-import backtype.storm.utils.Utils;
 import java.util.Map;
 import java.util.List;
 import java.io.IOException;
@@ -25,22 +22,16 @@ public class ShellSpout implements ISpout {
 
     public ShellSpout(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
-        _process = new ShellProcess(new JsonSerializer(), _command);
     }
 
     public ShellSpout(String... command) {
         _command = command;
-        _process = new ShellProcess(new JsonSerializer(), _command);
-    }
-
-    public ShellSpout(ISerializer serializer, String... command) {
-        _command = command;
-        _process = new ShellProcess(serializer, _command);
     }
 
     public void open(Map stormConf, TopologyContext context,
                      SpoutOutputCollector collector) {
         _collector = collector;
+        _process = new ShellProcess(_command);
 
         Number subpid = _process.launch(stormConf, context);
         LOG.info("Launched subprocess with pid " + subpid);
@@ -82,23 +73,24 @@ public class ShellSpout implements ISpout {
             _process.writeSpoutMsg(spoutMsg);
 
             while (true) {
-                Emission emission = _process.readEmission();
-                String command = emission.getCommand();
+                ShellMsg shellMsg = _process.readShellMsg();
+                String command = shellMsg.getCommand();
                 if (command.equals("sync")) {
                     return;
                 } else if (command.equals("log")) {
-                    String msg = emission.getMsg();
+                    String msg = shellMsg.getMsg();
                     LOG.info("Shell msg: " + msg);
                 } else if (command.equals("emit")) {
-                    String stream = emission.getStream();
-                    Long task = emission.getTask();
-                    List<Object> tuple = emission.getTuple();
-                    Object messageId = emission.getId();
+                    String stream = shellMsg.getStream();
+                    Long task = shellMsg.getTask();
+                    List<Object> tuple = shellMsg.getTuple();
+                    Object messageId = shellMsg.getId();
                     if (task == 0) {
                         List<Integer> outtasks = _collector.emit(stream, tuple, messageId);
                         _process.writeTaskIds(outtasks);
                     } else {
-                        _collector.emitDirect((int)task.longValue(), stream, tuple, messageId);
+                        _collector.emitDirect((int) task.longValue(), stream,
+                                tuple, messageId);
                     }
                 }
             }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 36585e3..6a6c305 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -3,10 +3,9 @@ package backtype.storm.task;
 import backtype.storm.generated.ShellComponent;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.utils.ShellProcess;
-import backtype.storm.multilang.Emission;
-import backtype.storm.multilang.ISerializer;
-import backtype.storm.multilang.Immission;
-import backtype.storm.multilang.JsonSerializer;
+import backtype.storm.multilang.BoltMsg;
+import backtype.storm.multilang.ShellMsg;
+
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
@@ -62,23 +61,17 @@ public class ShellBolt implements IBolt {
 
     public ShellBolt(ShellComponent component) {
         this(component.get_execution_command(), component.get_script());
-        _process = new ShellProcess(new JsonSerializer(), _command);
     }
 
     public ShellBolt(String... command) {
         _command = command;
-        _process = new ShellProcess(new JsonSerializer(), _command);
-    }
-
-    public ShellBolt(ISerializer serializer, String... command) {
-        _command = command;
-        _process = new ShellProcess(serializer, _command);
     }
 
     public void prepare(Map stormConf, TopologyContext context,
                         final OutputCollector collector) {
         _rand = new Random();
         _collector = collector;
+        _process = new ShellProcess(_command);
 
         //subprocesses must send their pid first thing
         Number subpid = _process.launch(stormConf, context);
@@ -89,20 +82,20 @@ public class ShellBolt implements IBolt {
             public void run() {
                 while (_running) {
                     try {
-                        Emission emission = _process.readEmission();
+                        ShellMsg shellMsg = _process.readShellMsg();
 
-                        String command = emission.getCommand();
+                        String command = shellMsg.getCommand();
                         if(command.equals("ack")) {
-                            handleAck(emission.getId());
+                            handleAck(shellMsg.getId());
                         } else if (command.equals("fail")) {
-                            handleFail(emission.getId());
+                            handleFail(shellMsg.getId());
                         } else if (command.equals("error")) {
-                            handleError(emission.getMsg());
+                            handleError(shellMsg.getMsg());
                         } else if (command.equals("log")) {
-                            String msg = emission.getMsg();
+                            String msg = shellMsg.getMsg();
                             LOG.info("Shell msg: " + msg);
                         } else if (command.equals("emit")) {
-                            handleEmit(emission);
+                            handleEmit(shellMsg);
                         }
                     } catch (InterruptedException e) {
                     } catch (Throwable t) {
@@ -118,12 +111,13 @@ public class ShellBolt implements IBolt {
             public void run() {
                 while (_running) {
                     try {
-                    	// FIXME: This can either be TaskIds or Immissions
                         Object write = _pendingWrites.poll(1, SECONDS);
-                        if (write instanceof Immission) {
-                            _process.writeImmission((Immission)write);
+                        if (write instanceof BoltMsg) {
+                            _process.writeBoltMsg((BoltMsg)write);
                         } else if (write instanceof List<?>) {
-
+                            _process.writeTaskIds((List<Integer>)write);
+                        } else {
+                            throw new RuntimeException("Cannot write object to bolt:\n" + write.toString());
                         }
                     } catch (InterruptedException e) {
                     } catch (Throwable t) {
@@ -145,14 +139,14 @@ public class ShellBolt implements IBolt {
         String genId = Long.toString(_rand.nextLong());
         _inputs.put(genId, input);
         try {
-            Immission immission = new Immission();
-            immission.setId(genId);
-            immission.setComp(input.getSourceComponent());
-            immission.setStream(input.getSourceStreamId());
-            immission.setTask(input.getSourceTask());
-            immission.setTuple(input.getValues());
-
-            _pendingWrites.put(immission);
+            BoltMsg boltMsg = new BoltMsg();
+            boltMsg.setId(genId);
+            boltMsg.setComp(input.getSourceComponent());
+            boltMsg.setStream(input.getSourceStreamId());
+            boltMsg.setTask(input.getSourceTask());
+            boltMsg.setTuple(input.getValues());
+
+            _pendingWrites.put(boltMsg);
         } catch(InterruptedException e) {
             throw new RuntimeException("Error during multilang processing", e);
         }
@@ -184,9 +178,9 @@ public class ShellBolt implements IBolt {
         _collector.reportError(new Exception("Shell Process Exception: " + msg));
     }
 
-    private void handleEmit(Emission emission) throws InterruptedException {
+    private void handleEmit(ShellMsg shellMsg) throws InterruptedException {
     	List<Tuple> anchors = new ArrayList<Tuple>();
-    	for (String anchor : emission.getAnchors()) {
+    	for (String anchor : shellMsg.getAnchors()) {
 	    	Tuple t = _inputs.get(anchor);
 	        if (t == null) {
 	            throw new RuntimeException("Anchored onto " + anchor + " after ack/fail");
@@ -194,11 +188,12 @@ public class ShellBolt implements IBolt {
 	        anchors.add(t);
     	}
 
-        if(emission.getTask() == 0) {
-            List<Integer> outtasks = _collector.emit(emission.getStream(), anchors, emission.getTuple());
+        if(shellMsg.getTask() == 0) {
+            List<Integer> outtasks = _collector.emit(shellMsg.getStream(), anchors, shellMsg.getTuple());
             _pendingWrites.put(outtasks);
         } else {
-            _collector.emitDirect((int)emission.getTask(), emission.getStream(), anchors, emission.getTuple());
+            _collector.emitDirect((int) shellMsg.getTask(),
+                    shellMsg.getStream(), anchors, shellMsg.getTuple());
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/ee0678c7/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
index 38976bc..8871a77 100644
--- a/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
+++ b/storm-core/src/jvm/backtype/storm/utils/ShellProcess.java
@@ -1,9 +1,10 @@
 package backtype.storm.utils;
 
-import backtype.storm.multilang.Emission;
+import backtype.storm.Config;
 import backtype.storm.multilang.ISerializer;
-import backtype.storm.multilang.Immission;
+import backtype.storm.multilang.BoltMsg;
 import backtype.storm.multilang.NoOutputException;
+import backtype.storm.multilang.ShellMsg;
 import backtype.storm.multilang.SpoutMsg;
 import backtype.storm.task.TopologyContext;
 import java.io.File;
@@ -18,85 +19,109 @@ import org.apache.log4j.Logger;
 
 public class ShellProcess implements Serializable {
     public static Logger LOG = Logger.getLogger(ShellProcess.class);
-    private Process _subprocess;
-    private InputStream processErrorStream;
-    private String[] command;
-    public ISerializer serializer;
+    private Process      _subprocess;
+    private InputStream  processErrorStream;
+    private String[]     command;
+    public ISerializer   serializer;
 
-    public ShellProcess(ISerializer serializer, String[] command) {
+    public ShellProcess(String[] command) {
         this.command = command;
-        this.serializer = serializer;
     }
 
     public Number launch(Map conf, TopologyContext context) {
         ProcessBuilder builder = new ProcessBuilder(command);
         builder.directory(new File(context.getCodeDir()));
 
+        this.serializer = getSerializer(conf);
+
         Number pid;
         try {
-	        _subprocess = builder.start();
-	        processErrorStream = _subprocess.getErrorStream();
-	        serializer.initialize(_subprocess.getOutputStream(), _subprocess.getInputStream());
-	        pid = serializer.connect(conf, context);
+            _subprocess = builder.start();
+            processErrorStream = _subprocess.getErrorStream();
+            serializer.initialize(_subprocess.getOutputStream(),
+                    _subprocess.getInputStream());
+            pid = serializer.connect(conf, context);
         } catch (IOException e) {
-        	throw new RuntimeException("Error when launching multilang subprocess\n" + getErrorsString(), e);
+            throw new RuntimeException(
+                    "Error when launching multilang subprocess\n"
+                            + getErrorsString(), e);
         } catch (NoOutputException e) {
             throw new RuntimeException(e + getErrorsString() + "\n");
-    	}
+        }
         return pid;
     }
 
+    private ISerializer getSerializer(Map conf) {
+        //get factory class name
+        String serializer_className = (String)conf.get(Config.STORM_MULTILANG_SERIALIZER);
+        LOG.info("Storm multilang serializer:" + serializer_className);
+
+        ISerializer serializer = null;
+        try {
+            //create a factory class
+            Class klass = Class.forName(serializer_className);
+            //obtain a serializer object
+            Object obj = klass.newInstance();
+            serializer = (ISerializer)obj;
+        } catch(Exception e) {
+            throw new RuntimeException("Failed to construct multilang serializer from serializer " + serializer_className, e);
+        }
+        return serializer;
+    }
+
     public void destroy() {
         _subprocess.destroy();
     }
 
-    public Emission readEmission() throws IOException {
-    	try  {
-    		return serializer.readEmission();
-    	} catch (NoOutputException e) {
+    public ShellMsg readShellMsg() throws IOException {
+        try {
+            return serializer.readShellMsg();
+        } catch (NoOutputException e) {
             throw new RuntimeException(e + getErrorsString() + "\n");
-    	}
+        }
     }
 
-    public void writeImmission(Immission msg) throws IOException {
-    	serializer.writeImmission(msg);
-        // drain the error stream to avoid dead lock because of full error stream buffer
+    public void writeBoltMsg(BoltMsg msg) throws IOException {
+        serializer.writeBoltMsg(msg);
+        // drain the error stream to avoid dead lock because of full error
+        // stream buffer
         drainErrorStream();
     }
 
     public void writeSpoutMsg(SpoutMsg msg) throws IOException {
-    	serializer.writeSpoutMsg(msg);
-        // drain the error stream to avoid dead lock because of full error stream buffer
+        serializer.writeSpoutMsg(msg);
+        // drain the error stream to avoid dead lock because of full error
+        // stream buffer
         drainErrorStream();
     }
 
     public void writeTaskIds(List<Integer> taskIds) throws IOException {
-    	serializer.writeTaskIds(taskIds);
-        // drain the error stream to avoid dead lock because of full error stream buffer
+        serializer.writeTaskIds(taskIds);
+        // drain the error stream to avoid dead lock because of full error
+        // stream buffer
         drainErrorStream();
     }
 
-    public void drainErrorStream()
-    {
+    public void drainErrorStream() {
         try {
-            while (processErrorStream.available() > 0)
-            {
+            while (processErrorStream.available() > 0) {
                 int bufferSize = processErrorStream.available();
-                byte[] errorReadingBuffer =  new byte[bufferSize];
+                byte[] errorReadingBuffer = new byte[bufferSize];
 
                 processErrorStream.read(errorReadingBuffer, 0, bufferSize);
 
-                LOG.info("Got error from shell process: " + new String(errorReadingBuffer));
+                LOG.info("Got error from shell process: "
+                        + new String(errorReadingBuffer));
             }
-        } catch(Exception e) {
+        } catch (Exception e) {
         }
     }
 
     public String getErrorsString() {
-        if(processErrorStream!=null) {
+        if (processErrorStream != null) {
             try {
                 return IOUtils.toString(processErrorStream);
-            } catch(IOException e) {
+            } catch (IOException e) {
                 return "(Unable to capture error stream)";
             }
         } else {