You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:25 UTC

[05/27] storm git commit: repackage multi-lang resources

repackage multi-lang resources


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cb5afe32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cb5afe32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cb5afe32

Branch: refs/heads/0.10.x-branch
Commit: cb5afe3262b3cc793cc4d3a9dd2ea7b5b89e37cd
Parents: 1247f6d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 15:54:24 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400

----------------------------------------------------------------------
 pom.xml                                         |   3 +
 storm-core/pom.xml                              |  35 ++
 storm-core/src/dev/resources/storm.js           | 373 -------------------
 storm-core/src/dev/resources/storm.py           | 260 -------------
 storm-core/src/dev/resources/storm.rb           | 236 ------------
 storm-multilang/multilang-javascript/pom.xml    |  32 ++
 .../src/main/resources/resources/storm.js       | 366 ++++++++++++++++++
 storm-multilang/multilang-python/pom.xml        |  32 ++
 .../src/main/resources/resources/storm.py       | 260 +++++++++++++
 storm-multilang/multilang-ruby/pom.xml          |  32 ++
 .../src/main/resources/resources/storm.rb       | 236 ++++++++++++
 11 files changed, 996 insertions(+), 869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 236e20f..4d7fbd2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,9 @@
     <modules>
         <module>storm-buildtools/maven-shade-clojure-transformer</module>
         <module>storm-buildtools/storm-maven-plugins</module>
+        <module>storm-multilang/multilang-javascript</module>
+        <module>storm-multilang/multilang-python</module>
+        <module>storm-multilang/multilang-ruby</module>
         <module>storm-core</module>
         <module>examples/storm-starter</module>
         <module>external/storm-kafka</module>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 17e1a15..db54481 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -35,6 +35,12 @@
     </properties>
 
     <dependencies>
+        <!-- multi-lang resources -->
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>multilang-ruby</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--clojure-->
         <dependency>
             <groupId>org.clojure</groupId>
@@ -510,6 +516,35 @@
                             <includeScope>runtime</includeScope>
                         </configuration>
                     </execution>
+                    <!-- multi-lang resources -->
+                               <execution>
+                                 <id>unpack</id>
+                                 <phase>process-test-resources</phase>
+                                 <goals>
+                                   <goal>unpack</goal>
+                                 </goals>
+                                 <configuration>
+                                   <artifactItems>
+                                     <artifactItem>
+                                       <groupId>org.apache.storm</groupId>
+                                       <artifactId>multilang-ruby</artifactId>
+                                       <version>${project.version}</version>
+                                     </artifactItem>
+                                     <artifactItem>
+                                        <groupId>org.apache.storm</groupId>
+                                        <artifactId>multilang-python</artifactId>
+                                        <version>${project.version}</version>
+                                      </artifactItem>
+                                      <artifactItem>
+                                         <groupId>org.apache.storm</groupId>
+                                         <artifactId>multilang-javascript</artifactId>
+                                         <version>${project.version}</version>
+                                       </artifactItem>
+                                   </artifactItems>
+                                   <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                                 </configuration>
+                               </execution>
+
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
deleted file mode 100755
index 8827cd3..0000000
--- a/storm-core/src/dev/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
-    this.messagePart = "";
-    this.taskIdsCallbacks = [];
-    this.isFirstMessage = true;
-    this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
-    var str = JSON.stringify(msg);
-    process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
-    this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
-    var pid = process.pid;
-    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
-    this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
-    this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
-    var self = this;
-    var callback = function() {
-        self.sendPid(setupInfo['pidDir']);
-    }
-    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
-    var self = this;
-    process.stdin.on('readable', function() {
-        var chunk = process.stdin.read();
-        var messages = self.handleNewChunk(chunk);
-        messages.forEach(function(message) {
-            self.handleNewMessage(message);
-        })
-
-    });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
-    //invariant: this.messagePart has no separator otherwise we would have parsed it already
-    var messages = [];
-    if (chunk && chunk.length !== 0) {
-        //"{}".split("\nend\n")           ==> ['{}']
-        //"\nend\n".split("\nend\n")      ==> [''  , '']
-        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
-        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
-        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
-        this.messagePart = this.messagePart + chunk;
-        var newMessageParts = this.messagePart.split(this.separator);
-        while (newMessageParts.length > 0) {
-            var potentialMessage = newMessageParts.shift();
-            var anotherMessageAhead = newMessageParts.length > 0;
-            if  (!anotherMessageAhead) {
-                this.messagePart = potentialMessage;
-            }
-            else if (potentialMessage.length > 0) {
-                messages.push(potentialMessage);
-            }
-        }
-    }
-    return messages;
- }
-
-Storm.prototype.isTaskIds = function(msg) {
-    return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
-    var parsedMsg = JSON.parse(msg);
-
-    if (this.isFirstMessage) {
-        this.initSetupInfo(parsedMsg);
-        this.isFirstMessage = false;
-    } else if (this.isTaskIds(parsedMsg)) {
-        this.handleNewTaskId(parsedMsg);
-    } else {
-        this.handleNewCommand(parsedMsg);
-    }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
-    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
-    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
-    //take the first callback in the list and be sure it is the right one.
-
-    var callback = this.taskIdsCallbacks.shift();
-    if (callback) {
-        callback(taskIds);
-    } else {
-        throw new Error('Something went wrong, we off the split of task id callbacks');
-    }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
-    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
-    //through the callback (will be called when the response arrives). The callback is stored in a list until the
-    //corresponding task id list arrives.
-    if (messageDetails.task) {
-        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
-    }
-
-    if (!onTaskIds) {
-        throw new Error('You must pass a onTaskIds callback when using emit!')
-    }
-
-    this.taskIdsCallbacks.push(onTaskIds);
-    this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
-    if (!commandDetails.task) {
-        throw new Error("Emit direct must receive task id!")
-    }
-    this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
-    done();
-}
-
-Storm.prototype.run = function() {
-    process.stdout.setEncoding('utf8');
-    process.stdin.setEncoding('utf8');
-    this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
-    this.id = id;
-    this.component = component;
-    this.stream = stream;
-    this.task = task;
-    this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
-    Storm.call(this);
-    this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
-    var self = this;
-
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        stream: commandDetails.stream,
-        task: commandDetails.task,
-        anchors: [commandDetails.anchorTupleId]
-    };
-
-    this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
-    if (tup.task === -1 && tup.stream === "__heartbeat") {
-        self.sync();
-        return;
-    }
-
-    var callback = function(err) {
-          if (err) {
-              self.fail(tup, err);
-              return;
-          }
-          self.ack(tup);
-      }
-    this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
-    this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
-    this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
-    Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var callback = function() {
-        self.sync();
-    }
-
-    if (command["command"] === "next") {
-        this.nextTuple(callback);
-    }
-
-    if (command["command"] === "ack") {
-        this.ack(command["id"], callback);
-    }
-
-    if (command["command"] === "fail") {
-        this.fail(command["id"], callback);
-    }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        id: commandDetails.id,
-        stream: commandDetails.stream,
-        task: commandDetails.task
-    };
-
-    this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py
deleted file mode 100755
index 642c393..0000000
--- a/storm-core/src/dev/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
-    import simplejson as json
-except ImportError:
-    import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
-    msg = ""
-    while True:
-        line = sys.stdin.readline()
-        if not line:
-            raise Exception('Read EOF from stdin')
-        if line[0:-1] == "end":
-            break
-        msg = msg + line
-    return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
-    if pending_taskids:
-        return pending_taskids.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is not list:
-            pending_commands.append(msg)
-            msg = readMsg()
-        return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
-    if pending_commands:
-        return pending_commands.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is list:
-            pending_taskids.append(msg)
-            msg = readMsg()
-        return msg
-
-def readTuple():
-    cmd = readCommand()
-    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
-    print json_encode(msg)
-    print "end"
-    sys.stdout.flush()
-
-def sync():
-    sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
-    pid = os.getpid()
-    sendMsgToParent({'pid':pid})
-    open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
-    __emit(*args, **kwargs)
-    return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
-    kwargs["directTask"] = task
-    __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
-    global MODE
-    if MODE == Bolt:
-        emitBolt(*args, **kwargs)
-    elif MODE == Spout:
-        emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
-    global ANCHOR_TUPLE
-    if ANCHOR_TUPLE is not None:
-        anchors = [ANCHOR_TUPLE]
-    m = {"command": "emit"}
-    if stream is not None:
-        m["stream"] = stream
-    m["anchors"] = map(lambda a: a.id, anchors)
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
-    m = {"command": "emit"}
-    if id is not None:
-        m["id"] = id
-    if stream is not None:
-        m["stream"] = stream
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def ack(tup):
-    sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
-    sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
-    sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
-    sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
-    log(msg, 0)
-
-def logDebug(msg):
-    log(msg, 1)
-
-def logInfo(msg):
-    log(msg, 2)
-
-def logWarn(msg):
-    log(msg, 3)
-
-def logError(msg):
-    log(msg, 4)
-
-def rpcMetrics(name, params):
-    sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
-    setupInfo = readMsg()
-    sendpid(setupInfo['pidDir'])
-    return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
-    def __init__(self, id, component, stream, task, values):
-        self.id = id
-        self.component = component
-        self.stream = stream
-        self.task = task
-        self.values = values
-
-    def __repr__(self):
-        return '<%s%s>' % (
-            self.__class__.__name__,
-            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
-    def is_heartbeat_tuple(self):
-        return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    self.process(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        global ANCHOR_TUPLE
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    ANCHOR_TUPLE = tup
-                    try:
-                        self.process(tup)
-                        ack(tup)
-                    except Exception, e:
-                        reportError(traceback.format_exc(e))
-                        fail(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class Spout(object):
-    def initialize(self, conf, context):
-        pass
-
-    def ack(self, id):
-        pass
-
-    def fail(self, id):
-        pass
-
-    def nextTuple(self):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Spout
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                msg = readCommand()
-                if msg["command"] == "next":
-                    self.nextTuple()
-                if msg["command"] == "ack":
-                    self.ack(msg["id"])
-                if msg["command"] == "fail":
-                    self.fail(msg["id"])
-                sync()
-        except Exception, e:
-            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.rb b/storm-core/src/dev/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/storm-core/src/dev/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
-  module Protocol
-    class << self
-      attr_accessor :mode, :pending_taskids, :pending_commands
-    end
-
-    self.pending_taskids = []
-    self.pending_commands = []
-
-    def read_message
-      msg = ""
-      loop do
-        line = STDIN.readline.chomp
-        break if line == "end"
-        msg << line
-        msg << "\n"
-      end
-      JSON.parse msg.chomp
-    end
-
-    def read_task_ids
-      Storm::Protocol.pending_taskids.shift ||
-          begin
-            msg = read_message
-            until msg.is_a? Array
-              Storm::Protocol.pending_commands.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def read_command
-      Storm::Protocol.pending_commands.shift ||
-          begin
-            msg = read_message
-            while msg.is_a? Array
-              Storm::Protocol.pending_taskids.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def send_msg_to_parent(msg)
-      puts msg.to_json
-      puts "end"
-      STDOUT.flush
-    end
-
-    def sync
-      send_msg_to_parent({'command' => 'sync'})
-    end
-
-    def send_pid(heartbeat_dir)
-      pid = Process.pid
-      send_msg_to_parent({'pid' => pid})
-      File.open("#{heartbeat_dir}/#{pid}", "w").close
-    end
-
-    def emit_bolt(tup, args = {})
-      stream = args[:stream]
-      anchors = args[:anchors] || args[:anchor] || []
-      anchors = [anchors] unless anchors.is_a? Enumerable
-      direct = args[:direct_task]
-      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit_spout(tup, args = {})
-      stream = args[:stream]
-      id = args[:id]
-      direct = args[:direct_task]
-      m = {:command => :emit, :tuple => tup}
-      m[:id] = id if id
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit(*args)
-      case Storm::Protocol.mode
-        when 'spout'
-          emit_spout(*args)
-        when 'bolt'
-          emit_bolt(*args)
-      end
-    end
-
-    def ack(tup)
-      send_msg_to_parent :command => :ack, :id => tup.id
-    end
-
-    def fail(tup)
-      send_msg_to_parent :command => :fail, :id => tup.id
-    end
-
-    def reportError(msg)
-      send_msg_to_parent :command => :error, :msg => msg.to_s
-    end
-
-    def log(msg, level=2)
-      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
-    end
-
-    def logTrace(msg)
-      log(msg, 0)
-    end
-
-    def logDebug(msg)
-      log(msg, 1)
-    end
-
-    def logInfo(msg)
-      log(msg, 2)
-    end
-
-    def logWarn(msg)
-      log(msg, 3)
-    end
-
-    def logError(msg)
-      log(msg, 4)
-    end
-
-    def handshake
-      setup_info = read_message
-      send_pid setup_info['pidDir']
-      [setup_info['conf'], setup_info['context']]
-    end
-  end
-
-  class Tuple
-    attr_accessor :id, :component, :stream, :task, :values
-
-    def initialize(id, component, stream, task, values)
-      @id = id
-      @component = component
-      @stream = stream
-      @task = task
-      @values = values
-    end
-
-    def self.from_hash(hash)
-      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
-    end
-
-    def is_heartbeat
-      task == -1 and stream == '__heartbeat'
-    end
-  end
-
-  class Bolt
-    include Storm::Protocol
-
-    def prepare(conf, context); end
-
-    def process(tuple); end
-
-    def run
-      Storm::Protocol.mode = 'bolt'
-      prepare(*handshake)
-      begin
-        while true
-          tuple = Tuple.from_hash(read_command)
-          if tuple.is_heartbeat
-            sync
-          else
-            process tuple
-          end
-        end
-      rescue Exception => e
-        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-
-  class Spout
-    include Storm::Protocol
-
-    def open(conf, context); end
-
-    def nextTuple; end
-
-    def ack(id); end
-
-    def fail(id); end
-
-    def run
-      Storm::Protocol.mode = 'spout'
-      open(*handshake)
-
-      begin
-        while true
-          msg = read_command
-          case msg['command']
-            when 'next'
-              nextTuple
-            when 'ack'
-              ack(msg['id'])
-            when 'fail'
-              fail(msg['id'])
-          end
-          sync
-        end
-      rescue Exception => e
-        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-end

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/pom.xml b/storm-multilang/multilang-javascript/pom.xml
new file mode 100644
index 0000000..e1cb993
--- /dev/null
+++ b/storm-multilang/multilang-javascript/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-javascript</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-javascript</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
new file mode 100755
index 0000000..f5dcad2
--- /dev/null
+++ b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ /**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+    this.messagePart = "";
+    this.taskIdsCallbacks = [];
+    this.isFirstMessage = true;
+    this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+    var str = JSON.stringify(msg);
+    process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+    this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+    var pid = process.pid;
+    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+    this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+    this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+    var self = this;
+    var callback = function() {
+        self.sendPid(setupInfo['pidDir']);
+    }
+    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+    var self = this;
+    process.stdin.on('readable', function() {
+        var chunk = process.stdin.read();
+        var messages = self.handleNewChunk(chunk);
+        messages.forEach(function(message) {
+            self.handleNewMessage(message);
+        })
+
+    });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messagePart
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+    //invariant: this.messagePart has no separator otherwise we would have parsed it already
+    var messages = [];
+    if (chunk && chunk.length !== 0) {
+        //"{}".split("\nend\n")           ==> ['{}']
+        //"\nend\n".split("\nend\n")      ==> [''  , '']
+        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
+        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
+        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+        this.messagePart = this.messagePart + chunk;
+        var newMessageParts = this.messagePart.split(this.separator);
+        while (newMessageParts.length > 0) {
+            var potentialMessage = newMessageParts.shift();
+            var anotherMessageAhead = newMessageParts.length > 0;
+            if  (!anotherMessageAhead) {
+                this.messagePart = potentialMessage;
+            }
+            else if (potentialMessage.length > 0) {
+                messages.push(potentialMessage);
+            }
+        }
+    }
+    return messages;
+ }
+
+Storm.prototype.isTaskIds = function(msg) {
+    return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+    var parsedMsg = JSON.parse(msg);
+
+    if (this.isFirstMessage) {
+        this.initSetupInfo(parsedMsg);
+        this.isFirstMessage = false;
+    } else if (this.isTaskIds(parsedMsg)) {
+        this.handleNewTaskId(parsedMsg);
+    } else {
+        this.handleNewCommand(parsedMsg);
+    }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
+    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
+    //take the first callback in the list and be sure it is the right one.
+
+    var callback = this.taskIdsCallbacks.shift();
+    if (callback) {
+        callback(taskIds);
+    } else {
+        throw new Error('Something went wrong, we off the split of task id callbacks');
+    }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
+    //through the callback (will be called when the response arrives). The callback is stored in a list until the
+    //corresponding task id list arrives.
+    if (messageDetails.task) {
+        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+    }
+
+    if (!onTaskIds) {
+        throw new Error('You must pass a onTaskIds callback when using emit!')
+    }
+
+    this.taskIdsCallbacks.push(onTaskIds);
+    this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+    if (!commandDetails.task) {
+        throw new Error("Emit direct must receive task id!")
+    }
+    this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+    done();
+}
+
+Storm.prototype.run = function() {
+    process.stdout.setEncoding('utf8');
+    process.stdin.setEncoding('utf8');
+    this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+    this.id = id;
+    this.component = component;
+    this.stream = stream;
+    this.task = task;
+    this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+    Storm.call(this);
+    this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+    var self = this;
+
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        stream: commandDetails.stream,
+        task: commandDetails.task,
+        anchors: [commandDetails.anchorTupleId]
+    };
+
+    this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+    var callback = function(err) {
+          if (err) {
+              self.fail(tup, err);
+              return;
+          }
+          self.ack(tup);
+      }
+    this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+    this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+    this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+    Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var callback = function() {
+        self.sync();
+    }
+
+    if (command["command"] === "next") {
+        this.nextTuple(callback);
+    }
+
+    if (command["command"] === "ack") {
+        this.ack(command["id"], callback);
+    }
+
+    if (command["command"] === "fail") {
+        this.fail(command["id"], callback);
+    }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        id: commandDetails.id,
+        stream: commandDetails.stream,
+        task: commandDetails.task
+    };
+
+    this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/pom.xml b/storm-multilang/multilang-python/pom.xml
new file mode 100644
index 0000000..379c0bc
--- /dev/null
+++ b/storm-multilang/multilang-python/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-python</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-python</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-python/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/src/main/resources/resources/storm.py b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
new file mode 100755
index 0000000..642c393
--- /dev/null
+++ b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
@@ -0,0 +1,260 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import os
+import traceback
+from collections import deque
+
+try:
+    import simplejson as json
+except ImportError:
+    import json
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
+
+#reads lines and reconstructs newlines appropriately
+def readMsg():
+    msg = ""
+    while True:
+        line = sys.stdin.readline()
+        if not line:
+            raise Exception('Read EOF from stdin')
+        if line[0:-1] == "end":
+            break
+        msg = msg + line
+    return json_decode(msg[0:-1])
+
+MODE = None
+ANCHOR_TUPLE = None
+
+#queue up commands we read while trying to read taskids
+pending_commands = deque()
+
+def readTaskIds():
+    if pending_taskids:
+        return pending_taskids.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is not list:
+            pending_commands.append(msg)
+            msg = readMsg()
+        return msg
+
+#queue up taskids we read while trying to read commands/tuples
+pending_taskids = deque()
+
+def readCommand():
+    if pending_commands:
+        return pending_commands.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is list:
+            pending_taskids.append(msg)
+            msg = readMsg()
+        return msg
+
+def readTuple():
+    cmd = readCommand()
+    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
+def sendMsgToParent(msg):
+    print json_encode(msg)
+    print "end"
+    sys.stdout.flush()
+
+def sync():
+    sendMsgToParent({'command':'sync'})
+
+def sendpid(heartbeatdir):
+    pid = os.getpid()
+    sendMsgToParent({'pid':pid})
+    open(heartbeatdir + "/" + str(pid), "w").close()
+
+def emit(*args, **kwargs):
+    __emit(*args, **kwargs)
+    return readTaskIds()
+
+def emitDirect(task, *args, **kwargs):
+    kwargs["directTask"] = task
+    __emit(*args, **kwargs)
+
+def __emit(*args, **kwargs):
+    global MODE
+    if MODE == Bolt:
+        emitBolt(*args, **kwargs)
+    elif MODE == Spout:
+        emitSpout(*args, **kwargs)
+
+def emitBolt(tup, stream=None, anchors = [], directTask=None):
+    global ANCHOR_TUPLE
+    if ANCHOR_TUPLE is not None:
+        anchors = [ANCHOR_TUPLE]
+    m = {"command": "emit"}
+    if stream is not None:
+        m["stream"] = stream
+    m["anchors"] = map(lambda a: a.id, anchors)
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def emitSpout(tup, stream=None, id=None, directTask=None):
+    m = {"command": "emit"}
+    if id is not None:
+        m["id"] = id
+    if stream is not None:
+        m["stream"] = stream
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def ack(tup):
+    sendMsgToParent({"command": "ack", "id": tup.id})
+
+def fail(tup):
+    sendMsgToParent({"command": "fail", "id": tup.id})
+
+def reportError(msg):
+    sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+    sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+    log(msg, 0)
+
+def logDebug(msg):
+    log(msg, 1)
+
+def logInfo(msg):
+    log(msg, 2)
+
+def logWarn(msg):
+    log(msg, 3)
+
+def logError(msg):
+    log(msg, 4)
+
+def rpcMetrics(name, params):
+    sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
+def initComponent():
+    setupInfo = readMsg()
+    sendpid(setupInfo['pidDir'])
+    return [setupInfo['conf'], setupInfo['context']]
+
+class Tuple(object):
+    def __init__(self, id, component, stream, task, values):
+        self.id = id
+        self.component = component
+        self.stream = stream
+        self.task = task
+        self.values = values
+
+    def __repr__(self):
+        return '<%s%s>' % (
+            self.__class__.__name__,
+            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
+
+    def is_heartbeat_tuple(self):
+        return self.task == -1 and self.stream == "__heartbeat"
+
+class Bolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    self.process(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class BasicBolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        global ANCHOR_TUPLE
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    ANCHOR_TUPLE = tup
+                    try:
+                        self.process(tup)
+                        ack(tup)
+                    except Exception, e:
+                        reportError(traceback.format_exc(e))
+                        fail(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class Spout(object):
+    def initialize(self, conf, context):
+        pass
+
+    def ack(self, id):
+        pass
+
+    def fail(self, id):
+        pass
+
+    def nextTuple(self):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Spout
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                msg = readCommand()
+                if msg["command"] == "next":
+                    self.nextTuple()
+                if msg["command"] == "ack":
+                    self.ack(msg["id"])
+                if msg["command"] == "fail":
+                    self.fail(msg["id"])
+                sync()
+        except Exception, e:
+            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/pom.xml b/storm-multilang/multilang-ruby/pom.xml
new file mode 100644
index 0000000..6b5dd0c
--- /dev/null
+++ b/storm-multilang/multilang-ruby/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-ruby</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-ruby</name>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
new file mode 100644
index 0000000..816694e
--- /dev/null
+++ b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
@@ -0,0 +1,236 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require "rubygems"
+require "json"
+
+module Storm
+  module Protocol
+    class << self
+      attr_accessor :mode, :pending_taskids, :pending_commands
+    end
+
+    self.pending_taskids = []
+    self.pending_commands = []
+
+    def read_message
+      msg = ""
+      loop do
+        line = STDIN.readline.chomp
+        break if line == "end"
+        msg << line
+        msg << "\n"
+      end
+      JSON.parse msg.chomp
+    end
+
+    def read_task_ids
+      Storm::Protocol.pending_taskids.shift ||
+          begin
+            msg = read_message
+            until msg.is_a? Array
+              Storm::Protocol.pending_commands.push(msg)
+              msg = read_message
+            end
+            msg
+          end
+    end
+
+    def read_command
+      Storm::Protocol.pending_commands.shift ||
+          begin
+            msg = read_message
+            while msg.is_a? Array
+              Storm::Protocol.pending_taskids.push(msg)
+              msg = read_message
+            end
+            msg
+          end
+    end
+
+    def send_msg_to_parent(msg)
+      puts msg.to_json
+      puts "end"
+      STDOUT.flush
+    end
+
+    def sync
+      send_msg_to_parent({'command' => 'sync'})
+    end
+
+    def send_pid(heartbeat_dir)
+      pid = Process.pid
+      send_msg_to_parent({'pid' => pid})
+      File.open("#{heartbeat_dir}/#{pid}", "w").close
+    end
+
+    def emit_bolt(tup, args = {})
+      stream = args[:stream]
+      anchors = args[:anchors] || args[:anchor] || []
+      anchors = [anchors] unless anchors.is_a? Enumerable
+      direct = args[:direct_task]
+      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit_spout(tup, args = {})
+      stream = args[:stream]
+      id = args[:id]
+      direct = args[:direct_task]
+      m = {:command => :emit, :tuple => tup}
+      m[:id] = id if id
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit(*args)
+      case Storm::Protocol.mode
+        when 'spout'
+          emit_spout(*args)
+        when 'bolt'
+          emit_bolt(*args)
+      end
+    end
+
+    def ack(tup)
+      send_msg_to_parent :command => :ack, :id => tup.id
+    end
+
+    def fail(tup)
+      send_msg_to_parent :command => :fail, :id => tup.id
+    end
+
+    def reportError(msg)
+      send_msg_to_parent :command => :error, :msg => msg.to_s
+    end
+
+    def log(msg, level=2)
+      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+    end
+
+    def logTrace(msg)
+      log(msg, 0)
+    end
+
+    def logDebug(msg)
+      log(msg, 1)
+    end
+
+    def logInfo(msg)
+      log(msg, 2)
+    end
+
+    def logWarn(msg)
+      log(msg, 3)
+    end
+
+    def logError(msg)
+      log(msg, 4)
+    end
+
+    def handshake
+      setup_info = read_message
+      send_pid setup_info['pidDir']
+      [setup_info['conf'], setup_info['context']]
+    end
+  end
+
+  class Tuple
+    attr_accessor :id, :component, :stream, :task, :values
+
+    def initialize(id, component, stream, task, values)
+      @id = id
+      @component = component
+      @stream = stream
+      @task = task
+      @values = values
+    end
+
+    def self.from_hash(hash)
+      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+    end
+
+    def is_heartbeat
+      task == -1 and stream == '__heartbeat'
+    end
+  end
+
+  class Bolt
+    include Storm::Protocol
+
+    def prepare(conf, context); end
+
+    def process(tuple); end
+
+    def run
+      Storm::Protocol.mode = 'bolt'
+      prepare(*handshake)
+      begin
+        while true
+          tuple = Tuple.from_hash(read_command)
+          if tuple.is_heartbeat
+            sync
+          else
+            process tuple
+          end
+        end
+      rescue Exception => e
+        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+
+  class Spout
+    include Storm::Protocol
+
+    def open(conf, context); end
+
+    def nextTuple; end
+
+    def ack(id); end
+
+    def fail(id); end
+
+    def run
+      Storm::Protocol.mode = 'spout'
+      open(*handshake)
+
+      begin
+        while true
+          msg = read_command
+          case msg['command']
+            when 'next'
+              nextTuple
+            when 'ack'
+              ack(msg['id'])
+            when 'fail'
+              fail(msg['id'])
+          end
+          sync
+        end
+      rescue Exception => e
+        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+end