You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:21 UTC

[01/27] storm git commit: STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.

Repository: storm
Updated Branches:
  refs/heads/0.10.x-branch 208ddbeff -> 1498ed062


STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28d40a40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28d40a40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28d40a40

Branch: refs/heads/0.10.x-branch
Commit: 28d40a40678426b352b6c3cd191630b5186f9f32
Parents: 208ddbe
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Mar 30 16:11:22 2015 -0700
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:57:24 2015 -0400

----------------------------------------------------------------------
 external/storm-kafka/README.md                   | 19 ++++++++++++++++++-
 .../src/jvm/storm/kafka/KafkaConfig.java         |  2 +-
 .../src/jvm/storm/kafka/KafkaUtils.java          |  5 +----
 .../src/jvm/storm/kafka/PartitionManager.java    |  4 ++--
 .../storm/kafka/trident/TridentKafkaEmitter.java |  4 ++--
 .../src/test/storm/kafka/KafkaUtilsTest.java     |  6 +++---
 6 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index c5ed4a5..8a98c0c 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -83,7 +83,7 @@ The KafkaConfig class also has bunch of public variables that controls your appl
     public int fetchMaxWait = 10000;
     public int bufferSizeBytes = 1024 * 1024;
     public MultiScheme scheme = new RawMultiScheme();
-    public boolean forceFromStart = false;
+    public boolean ignoreZkOffsets = false;
     public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
     public long maxOffsetBehind = Long.MAX_VALUE;
     public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
@@ -120,6 +120,23 @@ spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
 OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
 ```
 
+### How KafkaSpout stores offsets of a kafka topic and recovers incase of failures
+
+As shown in the above KafkaConfig properties , user can control where in the topic they can start reading by setting **KafkaConfig.startOffsetTime.**
+
+There are two options **kafka.api.OffsetRequest.EarliestTime()** which makes the KafkaSpout to read from the begining of the topic and 
+**kafka.api.OffsetRequest.LatestTime()** which starts at the end of the topic (or any new messsages that are being written to the topic).
+
+When user first deploys a KakfaSpout based topology they can use one of the above two options. As the topology runs 
+KafkaSpout keeps track of the offsets its reading and writes these offset information under **SpoutConfig.zkRoot+ "/" + SpoutConfig.id**
+Incase of failures it recovers from the last written offset from zookeeper. 
+
+If users deployed a topology , later killed and re-deploying should make sure that **SpoutConfig.id** and **SpoutConfig.zkRoot** 
+remains the same otherwise Kafkaspout won't be able to start from stored zookeeper offsets.
+
+Users can set **KafkaConfig.ignoreZkOffsets** to **true** to make KafkaSpout ignore any zookeeper based offsets 
+and start from configured **KafkaConfig.startOffsetTime**.
+
 ## Using storm-kafka with different versions of Scala
 
 Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in

http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index 5c85983..dd71b5a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -33,7 +33,7 @@ public class KafkaConfig implements Serializable {
     public int fetchMaxWait = 10000;
     public int bufferSizeBytes = 1024 * 1024;
     public MultiScheme scheme = new RawMultiScheme();
-    public boolean forceFromStart = false;
+    public boolean ignoreZkOffsets = false;
     public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
     public long maxOffsetBehind = Long.MAX_VALUE;
     public boolean useStartOffsetTimeIfOffsetOutOfRange = true;

http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..b018032 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -60,10 +60,7 @@ public class KafkaUtils {
 
 
     public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
-        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-        if ( config.forceFromStart ) {
-            startOffsetTime = config.startOffsetTime;
-        }
+        long startOffsetTime = config.startOffsetTime;
         return getOffset(consumer, topic, partition, startOffsetTime);
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 630c7f6..00ab981 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -91,9 +91,9 @@ public class PartitionManager {
         if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
             _committedTo = currentOffset;
             LOG.info("No partition information found, using configuration to determine offset");
-        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
+        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
             _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
-            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
+            LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
         } else {
             _committedTo = jsonOffset;
             LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );

http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 1a9be43..61c79a5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -100,7 +100,7 @@ public class TridentKafkaEmitter {
             if (lastTopoMeta != null) {
                 lastInstanceId = (String) lastTopoMeta.get("id");
             }
-            if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) {
+            if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
                 offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
             } else {
                 offset = (Long) lastMeta.get("nextOffset");
@@ -157,7 +157,7 @@ public class TridentKafkaEmitter {
     private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
         LOG.info("re-emitting batch, attempt " + attempt);
         String instanceId = (String) meta.get("instanceId");
-        if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) {
+        if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
             SimpleConsumer consumer = _connections.register(partition);
             long offset = (Long) meta.get("offset");
             long nextOffset = (Long) meta.get("nextOffset");

http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 1f1bbbc..965eaea 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -110,17 +110,17 @@ public class KafkaUtilsTest {
 
     @Test
     public void getOffsetFromConfigAndDontForceFromStart() {
-        config.forceFromStart = false;
+        config.ignoreZkOffsets = false;
         config.startOffsetTime = OffsetRequest.EarliestTime();
         createTopicAndSendMessage();
-        long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime());
+        long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
         long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
         assertThat(latestOffset, is(equalTo(offsetFromConfig)));
     }
 
     @Test
     public void getOffsetFromConfigAndFroceFromStart() {
-        config.forceFromStart = true;
+        config.ignoreZkOffsets = true;
         config.startOffsetTime = OffsetRequest.EarliestTime();
         createTopicAndSendMessage();
         long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());


[13/27] storm git commit: New method in the Tuple interface "boolean isTuple()" for easier handling of TickTuples.

Posted by pt...@apache.org.
New method in the Tuple interface "boolean isTuple()" for easier handling of TickTuples.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96f35cc4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96f35cc4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96f35cc4

Branch: refs/heads/0.10.x-branch
Commit: 96f35cc4b19f0b656982f2f4f7aa2dac85539034
Parents: cf4fdc7
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:50:48 2014 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:13 2015 -0400

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/tuple/Tuple.java     | 9 +++++++--
 storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 8 +++++++-
 2 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/96f35cc4/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 34dc61a..7ea93b9 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -42,7 +42,7 @@ public interface Tuple extends ITuple{
      * Gets the id of the component that created this tuple.
      */
     public String getSourceComponent();
-    
+
     /**
      * Gets the id of the task that created this tuple.
      */
@@ -52,7 +52,12 @@ public interface Tuple extends ITuple{
      * Gets the id of the stream that this tuple was emitted to.
      */
     public String getSourceStreamId();
-    
+
+    /**
+     * Returns if this tuple is a tick tuple or not.
+     */
+    public boolean isTick();
+
     /**
      * Gets the message id that associated with this tuple.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/96f35cc4/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 818eff1..7ff2c8c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,6 +17,7 @@
  */
 package backtype.storm.tuple;
 
+import backtype.storm.Constants;
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.utils.IndifferentAccessMap;
@@ -212,7 +213,12 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public String getSourceStreamId() {
         return streamId;
     }
-    
+
+    public boolean isTick() {
+        return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
+               this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+    }
+
     public MessageId getMessageId() {
         return id;
     }


[06/27] storm git commit: update storm-starter to use multilang components from storm distribution; switch to maven shade plugin for uber jar creation

Posted by pt...@apache.org.
update storm-starter to use multilang components from storm distribution; switch to maven shade plugin for uber jar creation


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3c701415
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3c701415
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3c701415

Branch: refs/heads/0.10.x-branch
Commit: 3c701415b3891cad6d93ff8147e635cad465300e
Parents: db49216
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:19:47 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400

----------------------------------------------------------------------
 examples/storm-starter/README.markdown          |   8 +-
 .../storm-starter/multilang/resources/storm.js  | 373 -------------------
 .../storm-starter/multilang/resources/storm.py  | 260 -------------
 .../storm-starter/multilang/resources/storm.rb  | 236 ------------
 examples/storm-starter/pom.xml                  |  15 +
 5 files changed, 19 insertions(+), 873 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-starter/README.markdown b/examples/storm-starter/README.markdown
index 8161a06..1cb6636 100644
--- a/examples/storm-starter/README.markdown
+++ b/examples/storm-starter/README.markdown
@@ -96,20 +96,20 @@ You can package a jar suitable for submitting to a Storm cluster with the comman
     $ mvn package
 
 This will package your code and all the non-Storm dependencies into a single "uberjar" (or "fat jar") at the path
-`target/storm-starter-{version}-jar-with-dependencies.jar`.
+`target/storm-starter-{version}.jar`.
 
 Example filename of the uberjar:
 
-    >>> target/storm-starter-0.9.3-incubating-SNAPSHOT-jar-with-dependencies.jar
+    >>> target/storm-starter-0.9.3-incubating-SNAPSHOT.jar
 
 You can submit (run) a topology contained in this uberjar to Storm via the `storm` CLI tool:
 
     # Example 1: Run the RollingTopWords in local mode (LocalCluster)
-    $ storm jar storm-starter-*-jar-with-dependencies.jar storm.starter.RollingTopWords
+    $ storm jar storm-starter-*.jar storm.starter.RollingTopWords
 
     # Example 2: Run the RollingTopWords in remote/cluster mode,
     #            under the name "production-topology"
-    $ storm jar storm-starter-*-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
+    $ storm jar storm-starter-*.jar storm.starter.RollingTopWords production-topology remote
 
 _Submitting a topology in local vs. remote mode:_
 It depends on the actual code of a topology how you can or even must tell Storm whether to run the topology locally (in

http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.js
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js
deleted file mode 100755
index 355c2d2..0000000
--- a/examples/storm-starter/multilang/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
-    this.messagePart = "";
-    this.taskIdsCallbacks = [];
-    this.isFirstMessage = true;
-    this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
-    var str = JSON.stringify(msg);
-    process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
-    this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
-    var pid = process.pid;
-    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
-    this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
-    this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
-    var self = this;
-    var callback = function() {
-        self.sendPid(setupInfo['pidDir']);
-    }
-    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
-    var self = this;
-    process.stdin.on('readable', function() {
-        var chunk = process.stdin.read();
-        var messages = self.handleNewChunk(chunk);
-        messages.forEach(function(message) {
-            self.handleNewMessage(message);
-        })
-
-    });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
-    //invariant: this.messagePart has no separator otherwise we would have parsed it already
-    var messages = [];
-    if (chunk && chunk.length !== 0) {
-        //"{}".split("\nend\n")           ==> ['{}']
-        //"\nend\n".split("\nend\n")      ==> [''  , '']
-        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
-        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
-        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
-        this.messagePart = this.messagePart + chunk;
-        var newMessageParts = this.messagePart.split(this.separator);
-        while (newMessageParts.length > 0) {
-            var potentialMessage = newMessageParts.shift();
-            var anotherMessageAhead = newMessageParts.length > 0;
-            if  (!anotherMessageAhead) {
-                this.messagePart = potentialMessage;
-            }
-            else if (potentialMessage.length > 0) {
-                messages.push(potentialMessage);
-            }
-        }
-    }
-    return messages;
-}
-
-Storm.prototype.isTaskIds = function(msg) {
-    return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
-    var parsedMsg = JSON.parse(msg);
-
-    if (this.isFirstMessage) {
-        this.initSetupInfo(parsedMsg);
-        this.isFirstMessage = false;
-    } else if (this.isTaskIds(parsedMsg)) {
-        this.handleNewTaskId(parsedMsg);
-    } else {
-        this.handleNewCommand(parsedMsg);
-    }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
-    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
-    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
-    //take the first callback in the list and be sure it is the right one.
-
-    var callback = this.taskIdsCallbacks.shift();
-    if (callback) {
-        callback(taskIds);
-    } else {
-        throw new Error('Something went wrong, we off the split of task id callbacks');
-    }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
-    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
-    //through the callback (will be called when the response arrives). The callback is stored in a list until the
-    //corresponding task id list arrives.
-    if (messageDetails.task) {
-        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
-    }
-
-    if (!onTaskIds) {
-        throw new Error('You must pass a onTaskIds callback when using emit!')
-    }
-
-    this.taskIdsCallbacks.push(onTaskIds);
-    this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
-    if (!commandDetails.task) {
-        throw new Error("Emit direct must receive task id!")
-    }
-    this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
-    done();
-}
-
-Storm.prototype.run = function() {
-    process.stdout.setEncoding('utf8');
-    process.stdin.setEncoding('utf8');
-    this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
-    this.id = id;
-    this.component = component;
-    this.stream = stream;
-    this.task = task;
-    this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
-    Storm.call(this);
-    this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
-    var self = this;
-
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        stream: commandDetails.stream,
-        task: commandDetails.task,
-        anchors: [commandDetails.anchorTupleId]
-    };
-
-    this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
-    if (tup.task === -1 && tup.stream === "__heartbeat") {
-        self.sync();
-        return;
-    }
-
-    var callback = function(err) {
-        if (err) {
-            self.fail(tup, err);
-            return;
-        }
-        self.ack(tup);
-    }
-    this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
-    this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
-    this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
-    Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var callback = function() {
-        self.sync();
-    }
-
-    if (command["command"] === "next") {
-        this.nextTuple(callback);
-    }
-
-    if (command["command"] === "ack") {
-        this.ack(command["id"], callback);
-    }
-
-    if (command["command"] === "fail") {
-        this.fail(command["id"], callback);
-    }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        id: commandDetails.id,
-        stream: commandDetails.stream,
-        task: commandDetails.task
-    };
-
-    this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.py
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.py b/examples/storm-starter/multilang/resources/storm.py
deleted file mode 100644
index 642c393..0000000
--- a/examples/storm-starter/multilang/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
-    import simplejson as json
-except ImportError:
-    import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
-    msg = ""
-    while True:
-        line = sys.stdin.readline()
-        if not line:
-            raise Exception('Read EOF from stdin')
-        if line[0:-1] == "end":
-            break
-        msg = msg + line
-    return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
-    if pending_taskids:
-        return pending_taskids.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is not list:
-            pending_commands.append(msg)
-            msg = readMsg()
-        return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
-    if pending_commands:
-        return pending_commands.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is list:
-            pending_taskids.append(msg)
-            msg = readMsg()
-        return msg
-
-def readTuple():
-    cmd = readCommand()
-    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
-    print json_encode(msg)
-    print "end"
-    sys.stdout.flush()
-
-def sync():
-    sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
-    pid = os.getpid()
-    sendMsgToParent({'pid':pid})
-    open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
-    __emit(*args, **kwargs)
-    return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
-    kwargs["directTask"] = task
-    __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
-    global MODE
-    if MODE == Bolt:
-        emitBolt(*args, **kwargs)
-    elif MODE == Spout:
-        emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
-    global ANCHOR_TUPLE
-    if ANCHOR_TUPLE is not None:
-        anchors = [ANCHOR_TUPLE]
-    m = {"command": "emit"}
-    if stream is not None:
-        m["stream"] = stream
-    m["anchors"] = map(lambda a: a.id, anchors)
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
-    m = {"command": "emit"}
-    if id is not None:
-        m["id"] = id
-    if stream is not None:
-        m["stream"] = stream
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def ack(tup):
-    sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
-    sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
-    sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
-    sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
-    log(msg, 0)
-
-def logDebug(msg):
-    log(msg, 1)
-
-def logInfo(msg):
-    log(msg, 2)
-
-def logWarn(msg):
-    log(msg, 3)
-
-def logError(msg):
-    log(msg, 4)
-
-def rpcMetrics(name, params):
-    sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
-    setupInfo = readMsg()
-    sendpid(setupInfo['pidDir'])
-    return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
-    def __init__(self, id, component, stream, task, values):
-        self.id = id
-        self.component = component
-        self.stream = stream
-        self.task = task
-        self.values = values
-
-    def __repr__(self):
-        return '<%s%s>' % (
-            self.__class__.__name__,
-            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
-    def is_heartbeat_tuple(self):
-        return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    self.process(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        global ANCHOR_TUPLE
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    ANCHOR_TUPLE = tup
-                    try:
-                        self.process(tup)
-                        ack(tup)
-                    except Exception, e:
-                        reportError(traceback.format_exc(e))
-                        fail(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class Spout(object):
-    def initialize(self, conf, context):
-        pass
-
-    def ack(self, id):
-        pass
-
-    def fail(self, id):
-        pass
-
-    def nextTuple(self):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Spout
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                msg = readCommand()
-                if msg["command"] == "next":
-                    self.nextTuple()
-                if msg["command"] == "ack":
-                    self.ack(msg["id"])
-                if msg["command"] == "fail":
-                    self.fail(msg["id"])
-                sync()
-        except Exception, e:
-            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.rb
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.rb b/examples/storm-starter/multilang/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/examples/storm-starter/multilang/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
-  module Protocol
-    class << self
-      attr_accessor :mode, :pending_taskids, :pending_commands
-    end
-
-    self.pending_taskids = []
-    self.pending_commands = []
-
-    def read_message
-      msg = ""
-      loop do
-        line = STDIN.readline.chomp
-        break if line == "end"
-        msg << line
-        msg << "\n"
-      end
-      JSON.parse msg.chomp
-    end
-
-    def read_task_ids
-      Storm::Protocol.pending_taskids.shift ||
-          begin
-            msg = read_message
-            until msg.is_a? Array
-              Storm::Protocol.pending_commands.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def read_command
-      Storm::Protocol.pending_commands.shift ||
-          begin
-            msg = read_message
-            while msg.is_a? Array
-              Storm::Protocol.pending_taskids.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def send_msg_to_parent(msg)
-      puts msg.to_json
-      puts "end"
-      STDOUT.flush
-    end
-
-    def sync
-      send_msg_to_parent({'command' => 'sync'})
-    end
-
-    def send_pid(heartbeat_dir)
-      pid = Process.pid
-      send_msg_to_parent({'pid' => pid})
-      File.open("#{heartbeat_dir}/#{pid}", "w").close
-    end
-
-    def emit_bolt(tup, args = {})
-      stream = args[:stream]
-      anchors = args[:anchors] || args[:anchor] || []
-      anchors = [anchors] unless anchors.is_a? Enumerable
-      direct = args[:direct_task]
-      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit_spout(tup, args = {})
-      stream = args[:stream]
-      id = args[:id]
-      direct = args[:direct_task]
-      m = {:command => :emit, :tuple => tup}
-      m[:id] = id if id
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit(*args)
-      case Storm::Protocol.mode
-        when 'spout'
-          emit_spout(*args)
-        when 'bolt'
-          emit_bolt(*args)
-      end
-    end
-
-    def ack(tup)
-      send_msg_to_parent :command => :ack, :id => tup.id
-    end
-
-    def fail(tup)
-      send_msg_to_parent :command => :fail, :id => tup.id
-    end
-
-    def reportError(msg)
-      send_msg_to_parent :command => :error, :msg => msg.to_s
-    end
-
-    def log(msg, level=2)
-      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
-    end
-
-    def logTrace(msg)
-      log(msg, 0)
-    end
-
-    def logDebug(msg)
-      log(msg, 1)
-    end
-
-    def logInfo(msg)
-      log(msg, 2)
-    end
-
-    def logWarn(msg)
-      log(msg, 3)
-    end
-
-    def logError(msg)
-      log(msg, 4)
-    end
-
-    def handshake
-      setup_info = read_message
-      send_pid setup_info['pidDir']
-      [setup_info['conf'], setup_info['context']]
-    end
-  end
-
-  class Tuple
-    attr_accessor :id, :component, :stream, :task, :values
-
-    def initialize(id, component, stream, task, values)
-      @id = id
-      @component = component
-      @stream = stream
-      @task = task
-      @values = values
-    end
-
-    def self.from_hash(hash)
-      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
-    end
-
-    def is_heartbeat
-      task == -1 and stream == '__heartbeat'
-    end
-  end
-
-  class Bolt
-    include Storm::Protocol
-
-    def prepare(conf, context); end
-
-    def process(tuple); end
-
-    def run
-      Storm::Protocol.mode = 'bolt'
-      prepare(*handshake)
-      begin
-        while true
-          tuple = Tuple.from_hash(read_command)
-          if tuple.is_heartbeat
-            sync
-          else
-            process tuple
-          end
-        end
-      rescue Exception => e
-        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-
-  class Spout
-    include Storm::Protocol
-
-    def open(conf, context); end
-
-    def nextTuple; end
-
-    def ack(id); end
-
-    def fail(id); end
-
-    def run
-      Storm::Protocol.mode = 'spout'
-      open(*handshake)
-
-      begin
-        while true
-          msg = read_command
-          case msg['command']
-            when 'next'
-              nextTuple
-            when 'ack'
-              ack(msg['id'])
-            when 'fail'
-              fail(msg['id'])
-          end
-          sync
-        end
-      rescue Exception => e
-        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-end

http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 5afe26e..8682a4b 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -72,6 +72,21 @@
       <!-- keep storm out of the jar-with-dependencies -->
       <scope>provided</scope>
     </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>multilang-javascript</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>multilang-ruby</artifactId>
+          <version>${project.version}</version>
+      </dependency>
+      <dependency>
+          <groupId>org.apache.storm</groupId>
+          <artifactId>multilang-python</artifactId>
+          <version>${project.version}</version>
+      </dependency>
     <dependency>
       <groupId>commons-collections</groupId>
       <artifactId>commons-collections</artifactId>


[12/27] storm git commit: add STORM-788 to changelog

Posted by pt...@apache.org.
add STORM-788 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cf4fdc77
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cf4fdc77
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cf4fdc77

Branch: refs/heads/0.10.x-branch
Commit: cf4fdc77b81ec2d144d841d85eef62e0d947e43e
Parents: 73b56d6
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 15:19:43 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:19:43 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cf4fdc77/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ec096e5..da1395c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-788: Fix key for process latencies
  * STORM-748: Package Multi-Lang scripts so they don't have to be duplicated
  * STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
  * STORM-615: Add REST API to upload topology.


[14/27] storm git commit: KafkaBolt no longer tries to map/process/send Tick Tuples to Kafka.

Posted by pt...@apache.org.
KafkaBolt no longer tries to map/process/send Tick Tuples to Kafka.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62385d7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62385d7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62385d7b

Branch: refs/heads/0.10.x-branch
Commit: 62385d7b290ac3599546d73fbdd5880b1f53fc0d
Parents: 96f35cc
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:51:42 2014 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:13 2015 -0400

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 ++++
 1 file changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/62385d7b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index cf169dc..913843c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -89,6 +89,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
 
     @Override
     public void execute(Tuple input) {
+        if (input.isTick()) {
+          return; // Do not try to send ticks to Kafka
+        }
+
         K key = null;
         V message = null;
         String topic = null;


[20/27] storm git commit: Fixed the tests in storm-starter that do not use the actual TupleImpl but mock everything themselves

Posted by pt...@apache.org.
Fixed the tests in storm-starter that do not use the actual TupleImpl but mock everything themselves


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/027db673
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/027db673
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/027db673

Branch: refs/heads/0.10.x-branch
Commit: 027db6738068ba61ba385acae0fb77a10b803e8d
Parents: d3dcd1b
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:53:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java  | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/027db673/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index b253350..3180fd3 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,6 +19,7 @@ package storm.starter.tools;
 
 import backtype.storm.Constants;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
 
 import static org.mockito.Mockito.*;
 
@@ -28,13 +29,22 @@ public final class MockTupleHelpers {
   }
 
   public static Tuple mockTickTuple() {
-    return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+    Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+    when(tuple.isTick()).thenReturn(true);
+    return tuple;
   }
 
   public static Tuple mockTuple(String componentId, String streamId) {
     Tuple tuple = mock(Tuple.class);
     when(tuple.getSourceComponent()).thenReturn(componentId);
     when(tuple.getSourceStreamId()).thenReturn(streamId);
+    when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
     return tuple;
   }
+  
+  private static boolean isTick(String componentId, String streamId) {
+    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && 
+           streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
+  }
+
 }


[26/27] storm git commit: Revert "STORM-651: Rename "ui" service to "storm ui" and temp.txt should be in TEMP folder.Add jdk and jres bin and libs dirs to lib path."

Posted by pt...@apache.org.
Revert "STORM-651: Rename "ui" service to "storm ui" and temp.txt should be in TEMP folder.Add jdk and jres bin and libs dirs to lib path."

This reverts commit db1182d753a6cc911a220d2891c8efd2f093d77c.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/790fcab8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/790fcab8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/790fcab8

Branch: refs/heads/0.10.x-branch
Commit: 790fcab8d0932e5f59a4e74d129e924015180f17
Parents: e106ccb
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 16:42:50 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 16:42:50 2015 -0400

----------------------------------------------------------------------
 bin/storm-config.cmd | 14 +++++++-------
 bin/storm.cmd        | 33 +++++++++++----------------------
 2 files changed, 18 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/790fcab8/bin/storm-config.cmd
----------------------------------------------------------------------
diff --git a/bin/storm-config.cmd b/bin/storm-config.cmd
index e185495..5203241 100644
--- a/bin/storm-config.cmd
+++ b/bin/storm-config.cmd
@@ -86,13 +86,13 @@ if not defined STORM_LOG_DIR (
 @rem retrieve storm.logback.conf.dir from conf file
 @rem
 
-"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value storm.logback.conf.dir > %CMD_TEMP_FILE%
+"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value storm.logback.conf.dir > temp.txt
   
-FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+FOR /F "delims=" %%i in (temp.txt) do (
 	FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 		if %%a == VALUE: (
 			set STORM_LOGBACK_CONFIGURATION_DIR=%%b
-			del /F %CMD_TEMP_FILE%)
+			del /F temp.txt)
 		)
 	)
 )		
@@ -113,9 +113,9 @@ if not defined STORM_LOGBACK_CONFIGURATION_FILE (
   set STORM_LOGBACK_CONFIGURATION_FILE=%STORM_HOME%\logback\cluster.xml
 )
 
-"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value java.library.path > %CMD_TEMP_FILE%
+"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value java.library.path > temp.txt
 
-FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+FOR /F "delims=" %%i in (temp.txt) do (
     FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	 if %%a == VALUE: (
 	   set JAVA_LIBRARY_PATH=%%b
@@ -125,10 +125,10 @@ FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
 
 
 :storm_opts
- set STORM_OPTS=-Dstorm.options= -Dstorm.home=%STORM_HOME% -Djava.library.path=%JAVA_LIBRARY_PATH%;%JAVA_HOME%\bin;%JAVA_HOME%\lib;%JAVA_HOME%\jre\bin;%JAVA_HOME%\jre\lib
+ set STORM_OPTS=-Dstorm.options= -Dstorm.home=%STORM_HOME% -Djava.library.path=%JAVA_LIBRARY_PATH%
  set STORM_OPTS=%STORM_OPTS% -Dlogback.configurationFile=%STORM_LOGBACK_CONFIGURATION_FILE%
  set STORM_OPTS=%STORM_OPTS% -Dstorm.log.dir=%STORM_LOG_DIR%
- del /F %CMD_TEMP_FILE%
+ del /F temp.txt
 
 
 if not defined STORM_SERVER_OPTS (

http://git-wip-us.apache.org/repos/asf/storm/blob/790fcab8/bin/storm.cmd
----------------------------------------------------------------------
diff --git a/bin/storm.cmd b/bin/storm.cmd
index ad1a81f..2f188cf 100644
--- a/bin/storm.cmd
+++ b/bin/storm.cmd
@@ -114,8 +114,8 @@
 
 :drpc
   set CLASS=backtype.storm.daemon.drpc
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value drpc.childopts > %CMD_TEMP_FILE%
-  FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value drpc.childopts > temp.txt
+  FOR /F "delims=" %%i in (temp.txt) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
 	   set CHILDOPTS=%%b
@@ -140,8 +140,8 @@
 
 :logviewer
   set CLASS=backtype.storm.daemon.logviewer
-   "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value logviewer.childopts > %CMD_TEMP_FILE%
-  FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+   "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value logviewer.childopts > temp.txt
+  FOR /F "delims=" %%i in (temp.txt) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
 	   set CHILDOPTS=%%b
@@ -152,8 +152,8 @@
 
 :nimbus
   set CLASS=backtype.storm.daemon.nimbus
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value nimbus.childopts > %CMD_TEMP_FILE%
-  FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value nimbus.childopts > temp.txt
+  FOR /F "delims=" %%i in (temp.txt) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
 	   set CHILDOPTS=%%b
@@ -184,8 +184,8 @@
   
 :supervisor
   set CLASS=backtype.storm.daemon.supervisor
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value supervisor.childopts > %CMD_TEMP_FILE%
-  FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value supervisor.childopts > temp.txt
+  FOR /F "delims=" %%i in (temp.txt) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
 	   set CHILDOPTS=%%b
@@ -197,8 +197,8 @@
 :ui
   set CLASS=backtype.storm.ui.core
   set CLASSPATH=%CLASSPATH%;%STORM_HOME%
-  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value ui.childopts > %CMD_TEMP_FILE%
-  FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+  "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value ui.childopts > temp.txt
+  FOR /F "delims=" %%i in (temp.txt) do (
      FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
 	  if %%a == VALUE: (
 	   set CHILDOPTS=%%b
@@ -212,17 +212,6 @@
   set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS%
   goto :eof
 
-:makeServiceXml
-  set arguments=%*
-  @echo ^<service^>
-  @echo   ^<id^>storm_%storm-command%^</id^>
-  @echo   ^<name^>storm_%storm-command%^</name^>
-  @echo   ^<description^>This service runs Storm %storm-command%^</description^>
-  @echo   ^<executable^>%JAVA%^</executable^>
-  @echo   ^<arguments^>%arguments%^</arguments^>
-  @echo ^</service^>
-  goto :eof
-
 :make_command_arguments
   if "%2" == "" goto :eof
   set _count=0
@@ -242,7 +231,7 @@
   
 :set_childopts
   set STORM_OPTS=%STORM_SERVER_OPTS% %STORM_OPTS% %CHILDOPTS%
-  del /F %CMD_TEMP_FILE%
+  del /F temp.txt
   goto :eof
 
 :print_usage


[02/27] storm git commit: STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.

Posted by pt...@apache.org.
STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/847fc1cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/847fc1cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/847fc1cf

Branch: refs/heads/0.10.x-branch
Commit: 847fc1cf6dcdc139f6f91a191a40a4dae1e97238
Parents: 28d40a4
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Tue Mar 31 20:21:29 2015 -0700
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:57:34 2015 -0400

----------------------------------------------------------------------
 external/storm-kafka/README.md | 8 +++++---
 1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/847fc1cf/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 8a98c0c..a841880 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -124,10 +124,12 @@ OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
 
 As shown in the above KafkaConfig properties , user can control where in the topic they can start reading by setting **KafkaConfig.startOffsetTime.**
 
-There are two options **kafka.api.OffsetRequest.EarliestTime()** which makes the KafkaSpout to read from the begining of the topic and 
-**kafka.api.OffsetRequest.LatestTime()** which starts at the end of the topic (or any new messsages that are being written to the topic).
+These are the options
+1. **kafka.api.OffsetRequest.EarliestTime()  or -2 (value returned by EarliestTime())** which makes the KafkaSpout to read from the begining of the topic 
+2. **kafka.api.OffsetRequest.LatestTime() or -1 (value returned by LatestTime())** which starts at the end of the topic ,any new messsages that are being written to the topic
+3. **System.time.currentTimeMillis()**
 
-When user first deploys a KakfaSpout based topology they can use one of the above two options. As the topology runs 
+When user first deploys a KakfaSpout based topology they can use one of the above options. As the topology runs 
 KafkaSpout keeps track of the offsets its reading and writes these offset information under **SpoutConfig.zkRoot+ "/" + SpoutConfig.id**
 Incase of failures it recovers from the last written offset from zookeeper. 
 


[17/27] storm git commit: Replaced TupleHelpers in the examples with the new tuple.isTick()

Posted by pt...@apache.org.
Replaced TupleHelpers in the examples with the new tuple.isTick()


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b000407
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b000407
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b000407

Branch: refs/heads/0.10.x-branch
Commit: 6b0004070c80ae8f01e4bc38b759d57f482feeb2
Parents: 6537b36
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 29 15:50:18 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:14 2015 -0400

----------------------------------------------------------------------
 .../storm/starter/bolt/AbstractRankerBolt.java  |  3 +-
 .../storm/starter/bolt/RollingCountBolt.java    |  3 +-
 .../jvm/storm/starter/util/TupleHelpers.java    | 33 --------------------
 3 files changed, 2 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6b000407/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index cc5c0e7..83c2cfc 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -26,7 +26,6 @@ import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.log4j.Logger;
 import storm.starter.tools.Rankings;
-import storm.starter.util.TupleHelpers;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -78,7 +77,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
    */
   @Override
   public final void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (TupleHelpers.isTickTuple(tuple)) {
+    if (tuple.isTick()) {
       getLogger().debug("Received tick tuple, triggering emit of current rankings");
       emitRankings(collector);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/6b000407/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f83906c..f023c0b 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
 import org.apache.log4j.Logger;
 import storm.starter.tools.NthLastModifiedTimeTracker;
 import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -95,7 +94,7 @@ public class RollingCountBolt extends BaseRichBolt {
 
   @Override
   public void execute(Tuple tuple) {
-    if (TupleHelpers.isTickTuple(tuple)) {
+    if (tuple.isTick()) {
       LOG.debug("Received tick tuple, triggering emit of current window counts");
       emitCurrentWindowCounts();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/6b000407/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java b/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
deleted file mode 100644
index 4ea669e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.util;
-
-import backtype.storm.Constants;
-import backtype.storm.tuple.Tuple;
-
-public final class TupleHelpers {
-
-  private TupleHelpers() {
-  }
-
-  public static boolean isTickTuple(Tuple tuple) {
-    return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
-        Constants.SYSTEM_TICK_STREAM_ID);
-  }
-
-}


[24/27] storm git commit: STORM-786: KafkaBolt should ack tick tuples

Posted by pt...@apache.org.
STORM-786: KafkaBolt should ack tick tuples


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69c5499e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69c5499e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69c5499e

Branch: refs/heads/0.10.x-branch
Commit: 69c5499eddb3630fcfd04d06b1ddfb51f077295b
Parents: 9116770
Author: Michael G. Noll <mi...@michael-noll.com>
Authored: Thu Apr 16 11:44:40 2015 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:51:26 2015 -0400

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  1 +
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 27 ++++++++++++++++++++
 2 files changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index a8c4321..714ecd3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -91,6 +91,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     @Override
     public void execute(Tuple input) {
         if (TupleUtils.isTick(input)) {
+          collector.ack(input);
           return; // Do not try to send ticks to Kafka
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index c30cba1..576cc12 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -18,6 +18,7 @@
 package storm.kafka.bolt;
 
 import backtype.storm.Config;
+import backtype.storm.Constants;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.task.IOutputCollector;
 import backtype.storm.task.OutputCollector;
@@ -26,6 +27,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.TupleImpl;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import backtype.storm.utils.Utils;
 import kafka.api.OffsetRequest;
 import kafka.javaapi.consumer.SimpleConsumer;
@@ -45,7 +47,10 @@ import java.util.HashMap;
 import java.util.Properties;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 public class KafkaBoltTest {
 
@@ -84,6 +89,18 @@ public class KafkaBoltTest {
     }
 
     @Test
+    public void shouldAcknowledgeTickTuples() throws Exception {
+        // Given
+        Tuple tickTuple = mockTickTuple();
+
+        // When
+        bolt.execute(tickTuple);
+
+        // Then
+        verify(collector).ack(tickTuple);
+    }
+
+    @Test
     public void executeWithKey() throws Exception {
         String message = "value-123";
         String key = "key-123";
@@ -185,4 +202,14 @@ public class KafkaBoltTest {
         };
         return new TupleImpl(topologyContext, new Values(message), 1, "");
     }
+
+    private Tuple mockTickTuple() {
+        Tuple tuple = mock(Tuple.class);
+        when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
+        when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+        // Sanity check
+        assertTrue(TupleUtils.isTick(tuple));
+        return tuple;
+    }
+
 }


[21/27] storm git commit: Code cleanup in mocking storm-starter tests

Posted by pt...@apache.org.
Code cleanup in mocking storm-starter tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63f331fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63f331fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63f331fd

Branch: refs/heads/0.10.x-branch
Commit: 63f331fd54ba2d6684767e87e586222ce84f4c4f
Parents: 22a2923
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 14:00:33 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java          | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/63f331fd/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 374288e..9e8629c 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,7 +19,6 @@ package storm.starter.tools;
 
 import backtype.storm.Constants;
 import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
 
 import static org.mockito.Mockito.*;
 
@@ -39,9 +38,9 @@ public final class MockTupleHelpers {
     when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
     return tuple;
   }
-  
+
   private static boolean isTick(String componentId, String streamId) {
-    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) && 
+    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
            streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
   }
 


[07/27] storm git commit: remove redundant multilang scripts

Posted by pt...@apache.org.
remove redundant multilang scripts


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/db492167
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/db492167
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/db492167

Branch: refs/heads/0.10.x-branch
Commit: db492167a735f6bb08f1844d56e55808e0eca035
Parents: 9e2c816
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:10:27 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400

----------------------------------------------------------------------
 examples/storm-starter/pom.xml       |  52 ++---
 storm-core/src/multilang/js/storm.js | 366 ------------------------------
 storm-core/src/multilang/py/storm.py | 260 ---------------------
 storm-core/src/multilang/rb/storm.rb | 236 -------------------
 4 files changed, 25 insertions(+), 889 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index f59e692..5afe26e 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -93,33 +93,31 @@
     </resources>
 
     <plugins>
-      <!--
-        Bind the maven-assembly-plugin to the package phase
-        this will create a jar file without the storm dependencies
-        suitable for deployment to a cluster.
-       -->
-      <plugin>
-        <artifactId>maven-assembly-plugin</artifactId>
-        <configuration>
-          <descriptorRefs>
-            <descriptorRef>jar-with-dependencies</descriptorRef>
-          </descriptorRefs>
-          <archive>
-            <manifest>
-              <mainClass />
-            </manifest>
-          </archive>
-        </configuration>
-        <executions>
-          <execution>
-            <id>make-assembly</id>
-            <phase>package</phase>
-            <goals>
-              <goal>single</goal>
-            </goals>
-          </execution>
-        </executions>
-      </plugin>
+        <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-shade-plugin</artifactId>
+            <version>1.4</version>
+            <configuration>
+                <createDependencyReducedPom>true</createDependencyReducedPom>
+            </configuration>
+            <executions>
+                <execution>
+                    <phase>package</phase>
+                    <goals>
+                        <goal>shade</goal>
+                    </goals>
+                    <configuration>
+                        <transformers>
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+                            <transformer
+                                    implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                            </transformer>
+                        </transformers>
+                    </configuration>
+                </execution>
+            </executions>
+        </plugin>
 
       <plugin>
         <groupId>com.theoryinpractise</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/storm-core/src/multilang/js/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/js/storm.js b/storm-core/src/multilang/js/storm.js
deleted file mode 100755
index f5dcad2..0000000
--- a/storm-core/src/multilang/js/storm.js
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- /**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
-    this.messagePart = "";
-    this.taskIdsCallbacks = [];
-    this.isFirstMessage = true;
-    this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
-    var str = JSON.stringify(msg);
-    process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
-    this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
-    var pid = process.pid;
-    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
-    this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
-    this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
-    var self = this;
-    var callback = function() {
-        self.sendPid(setupInfo['pidDir']);
-    }
-    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
-    var self = this;
-    process.stdin.on('readable', function() {
-        var chunk = process.stdin.read();
-        var messages = self.handleNewChunk(chunk);
-        messages.forEach(function(message) {
-            self.handleNewMessage(message);
-        })
-
-    });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
-    //invariant: this.messagePart has no separator otherwise we would have parsed it already
-    var messages = [];
-    if (chunk && chunk.length !== 0) {
-        //"{}".split("\nend\n")           ==> ['{}']
-        //"\nend\n".split("\nend\n")      ==> [''  , '']
-        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
-        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
-        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
-        this.messagePart = this.messagePart + chunk;
-        var newMessageParts = this.messagePart.split(this.separator);
-        while (newMessageParts.length > 0) {
-            var potentialMessage = newMessageParts.shift();
-            var anotherMessageAhead = newMessageParts.length > 0;
-            if  (!anotherMessageAhead) {
-                this.messagePart = potentialMessage;
-            }
-            else if (potentialMessage.length > 0) {
-                messages.push(potentialMessage);
-            }
-        }
-    }
-    return messages;
- }
-
-Storm.prototype.isTaskIds = function(msg) {
-    return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
-    var parsedMsg = JSON.parse(msg);
-
-    if (this.isFirstMessage) {
-        this.initSetupInfo(parsedMsg);
-        this.isFirstMessage = false;
-    } else if (this.isTaskIds(parsedMsg)) {
-        this.handleNewTaskId(parsedMsg);
-    } else {
-        this.handleNewCommand(parsedMsg);
-    }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
-    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
-    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
-    //take the first callback in the list and be sure it is the right one.
-
-    var callback = this.taskIdsCallbacks.shift();
-    if (callback) {
-        callback(taskIds);
-    } else {
-        throw new Error('Something went wrong, we off the split of task id callbacks');
-    }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
-    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
-    //through the callback (will be called when the response arrives). The callback is stored in a list until the
-    //corresponding task id list arrives.
-    if (messageDetails.task) {
-        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
-    }
-
-    if (!onTaskIds) {
-        throw new Error('You must pass a onTaskIds callback when using emit!')
-    }
-
-    this.taskIdsCallbacks.push(onTaskIds);
-    this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
-    if (!commandDetails.task) {
-        throw new Error("Emit direct must receive task id!")
-    }
-    this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
-    done();
-}
-
-Storm.prototype.run = function() {
-    process.stdout.setEncoding('utf8');
-    process.stdin.setEncoding('utf8');
-    this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
-    this.id = id;
-    this.component = component;
-    this.stream = stream;
-    this.task = task;
-    this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
-    Storm.call(this);
-    this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
-    var self = this;
-
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        stream: commandDetails.stream,
-        task: commandDetails.task,
-        anchors: [commandDetails.anchorTupleId]
-    };
-
-    this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-    var callback = function(err) {
-          if (err) {
-              self.fail(tup, err);
-              return;
-          }
-          self.ack(tup);
-      }
-    this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
-    this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
-    this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
-    Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var callback = function() {
-        self.sync();
-    }
-
-    if (command["command"] === "next") {
-        this.nextTuple(callback);
-    }
-
-    if (command["command"] === "ack") {
-        this.ack(command["id"], callback);
-    }
-
-    if (command["command"] === "fail") {
-        this.fail(command["id"], callback);
-    }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        id: commandDetails.id,
-        stream: commandDetails.stream,
-        task: commandDetails.task
-    };
-
-    this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
deleted file mode 100755
index 642c393..0000000
--- a/storm-core/src/multilang/py/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
-    import simplejson as json
-except ImportError:
-    import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
-    msg = ""
-    while True:
-        line = sys.stdin.readline()
-        if not line:
-            raise Exception('Read EOF from stdin')
-        if line[0:-1] == "end":
-            break
-        msg = msg + line
-    return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
-    if pending_taskids:
-        return pending_taskids.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is not list:
-            pending_commands.append(msg)
-            msg = readMsg()
-        return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
-    if pending_commands:
-        return pending_commands.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is list:
-            pending_taskids.append(msg)
-            msg = readMsg()
-        return msg
-
-def readTuple():
-    cmd = readCommand()
-    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
-    print json_encode(msg)
-    print "end"
-    sys.stdout.flush()
-
-def sync():
-    sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
-    pid = os.getpid()
-    sendMsgToParent({'pid':pid})
-    open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
-    __emit(*args, **kwargs)
-    return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
-    kwargs["directTask"] = task
-    __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
-    global MODE
-    if MODE == Bolt:
-        emitBolt(*args, **kwargs)
-    elif MODE == Spout:
-        emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
-    global ANCHOR_TUPLE
-    if ANCHOR_TUPLE is not None:
-        anchors = [ANCHOR_TUPLE]
-    m = {"command": "emit"}
-    if stream is not None:
-        m["stream"] = stream
-    m["anchors"] = map(lambda a: a.id, anchors)
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
-    m = {"command": "emit"}
-    if id is not None:
-        m["id"] = id
-    if stream is not None:
-        m["stream"] = stream
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def ack(tup):
-    sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
-    sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
-    sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
-    sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
-    log(msg, 0)
-
-def logDebug(msg):
-    log(msg, 1)
-
-def logInfo(msg):
-    log(msg, 2)
-
-def logWarn(msg):
-    log(msg, 3)
-
-def logError(msg):
-    log(msg, 4)
-
-def rpcMetrics(name, params):
-    sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
-    setupInfo = readMsg()
-    sendpid(setupInfo['pidDir'])
-    return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
-    def __init__(self, id, component, stream, task, values):
-        self.id = id
-        self.component = component
-        self.stream = stream
-        self.task = task
-        self.values = values
-
-    def __repr__(self):
-        return '<%s%s>' % (
-            self.__class__.__name__,
-            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
-    def is_heartbeat_tuple(self):
-        return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    self.process(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        global ANCHOR_TUPLE
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    ANCHOR_TUPLE = tup
-                    try:
-                        self.process(tup)
-                        ack(tup)
-                    except Exception, e:
-                        reportError(traceback.format_exc(e))
-                        fail(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class Spout(object):
-    def initialize(self, conf, context):
-        pass
-
-    def ack(self, id):
-        pass
-
-    def fail(self, id):
-        pass
-
-    def nextTuple(self):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Spout
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                msg = readCommand()
-                if msg["command"] == "next":
-                    self.nextTuple()
-                if msg["command"] == "ack":
-                    self.ack(msg["id"])
-                if msg["command"] == "fail":
-                    self.fail(msg["id"])
-                sync()
-        except Exception, e:
-            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/storm-core/src/multilang/rb/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/rb/storm.rb b/storm-core/src/multilang/rb/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/storm-core/src/multilang/rb/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
-  module Protocol
-    class << self
-      attr_accessor :mode, :pending_taskids, :pending_commands
-    end
-
-    self.pending_taskids = []
-    self.pending_commands = []
-
-    def read_message
-      msg = ""
-      loop do
-        line = STDIN.readline.chomp
-        break if line == "end"
-        msg << line
-        msg << "\n"
-      end
-      JSON.parse msg.chomp
-    end
-
-    def read_task_ids
-      Storm::Protocol.pending_taskids.shift ||
-          begin
-            msg = read_message
-            until msg.is_a? Array
-              Storm::Protocol.pending_commands.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def read_command
-      Storm::Protocol.pending_commands.shift ||
-          begin
-            msg = read_message
-            while msg.is_a? Array
-              Storm::Protocol.pending_taskids.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def send_msg_to_parent(msg)
-      puts msg.to_json
-      puts "end"
-      STDOUT.flush
-    end
-
-    def sync
-      send_msg_to_parent({'command' => 'sync'})
-    end
-
-    def send_pid(heartbeat_dir)
-      pid = Process.pid
-      send_msg_to_parent({'pid' => pid})
-      File.open("#{heartbeat_dir}/#{pid}", "w").close
-    end
-
-    def emit_bolt(tup, args = {})
-      stream = args[:stream]
-      anchors = args[:anchors] || args[:anchor] || []
-      anchors = [anchors] unless anchors.is_a? Enumerable
-      direct = args[:direct_task]
-      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit_spout(tup, args = {})
-      stream = args[:stream]
-      id = args[:id]
-      direct = args[:direct_task]
-      m = {:command => :emit, :tuple => tup}
-      m[:id] = id if id
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit(*args)
-      case Storm::Protocol.mode
-        when 'spout'
-          emit_spout(*args)
-        when 'bolt'
-          emit_bolt(*args)
-      end
-    end
-
-    def ack(tup)
-      send_msg_to_parent :command => :ack, :id => tup.id
-    end
-
-    def fail(tup)
-      send_msg_to_parent :command => :fail, :id => tup.id
-    end
-
-    def reportError(msg)
-      send_msg_to_parent :command => :error, :msg => msg.to_s
-    end
-
-    def log(msg, level=2)
-      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
-    end
-
-    def logTrace(msg)
-      log(msg, 0)
-    end
-
-    def logDebug(msg)
-      log(msg, 1)
-    end
-
-    def logInfo(msg)
-      log(msg, 2)
-    end
-
-    def logWarn(msg)
-      log(msg, 3)
-    end
-
-    def logError(msg)
-      log(msg, 4)
-    end
-
-    def handshake
-      setup_info = read_message
-      send_pid setup_info['pidDir']
-      [setup_info['conf'], setup_info['context']]
-    end
-  end
-
-  class Tuple
-    attr_accessor :id, :component, :stream, :task, :values
-
-    def initialize(id, component, stream, task, values)
-      @id = id
-      @component = component
-      @stream = stream
-      @task = task
-      @values = values
-    end
-
-    def self.from_hash(hash)
-      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
-    end
-
-    def is_heartbeat
-      task == -1 and stream == '__heartbeat'
-    end
-  end
-
-  class Bolt
-    include Storm::Protocol
-
-    def prepare(conf, context); end
-
-    def process(tuple); end
-
-    def run
-      Storm::Protocol.mode = 'bolt'
-      prepare(*handshake)
-      begin
-        while true
-          tuple = Tuple.from_hash(read_command)
-          if tuple.is_heartbeat
-            sync
-          else
-            process tuple
-          end
-        end
-      rescue Exception => e
-        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-
-  class Spout
-    include Storm::Protocol
-
-    def open(conf, context); end
-
-    def nextTuple; end
-
-    def ack(id); end
-
-    def fail(id); end
-
-    def run
-      Storm::Protocol.mode = 'spout'
-      open(*handshake)
-
-      begin
-        while true
-          msg = read_command
-          case msg['command']
-            when 'next'
-              nextTuple
-            when 'ack'
-              ack(msg['id'])
-            when 'fail'
-              fail(msg['id'])
-          end
-          sync
-        end
-      rescue Exception => e
-        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-end


[16/27] storm git commit: Added missing ack for the tick

Posted by pt...@apache.org.
Added missing ack for the tick


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3dcd1bd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3dcd1bd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3dcd1bd

Branch: refs/heads/0.10.x-branch
Commit: d3dcd1bdfc56fa7aca6752afdd5588586fc1ec91
Parents: 6b00040
Author: Niels Basjes <nb...@bol.com>
Authored: Thu Oct 30 10:27:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:14 2015 -0400

----------------------------------------------------------------------
 external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d3dcd1bd/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 913843c..0a1e5fe 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -90,6 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
     @Override
     public void execute(Tuple input) {
         if (input.isTick()) {
+          collector.ack(input);
           return; // Do not try to send ticks to Kafka
         }
 


[10/27] storm git commit: add STORM-748 to changelog

Posted by pt...@apache.org.
add STORM-748 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce5447e4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce5447e4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce5447e4

Branch: refs/heads/0.10.x-branch
Commit: ce5447e4dd3119fffac2335cab8e4368f178a236
Parents: 927132c
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 15:13:12 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:13:12 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ce5447e4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 753fc3a..ec096e5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,11 +1,12 @@
 ## 0.10.0
+ * STORM-748: Package Multi-Lang scripts so they don't have to be duplicated
  * STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
  * STORM-615: Add REST API to upload topology.
  * STORM-807: quote args correctly in /bin/storm
  * STORM-686: Add worker-launcher to storm-dist.
  * STORM-789: Send more topology context to Multi-Lang components via initial handshake
  * STORM-764: Have option to compress thrift heartbeat
- * JIRA STORM-766 (Include version info in the service page)
+ * STORM-766 (Include version info in the service page)
  * STORM-765: Thrift serialization for local state. 
  * STORM-762: uptime for worker heartbeats is lost when converted to thrift
  * STORM-750: Set Config serialVersionUID


[09/27] storm git commit: sync storm.js with master

Posted by pt...@apache.org.
sync storm.js with master


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17c1368c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17c1368c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17c1368c

Branch: refs/heads/0.10.x-branch
Commit: 17c1368c8a61f037874916c709562dcbf2b94267
Parents: cb5afe3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:02:39 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400

----------------------------------------------------------------------
 .../src/main/resources/resources/storm.js       | 25 +++++++++++++-------
 1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/17c1368c/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
index f5dcad2..dc6efc1 100755
--- a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
+++ b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
@@ -15,7 +15,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- /**
+
+/**
  * Base classes in node-js for storm Bolt and Spout.
  * Implements the storm multilang protocol for nodejs.
  */
@@ -97,7 +98,7 @@ Storm.prototype.handleNewChunk = function(chunk) {
         }
     }
     return messages;
- }
+}
 
 Storm.prototype.isTaskIds = function(msg) {
     return (msg instanceof Array);
@@ -260,13 +261,19 @@ BasicBolt.prototype.__emit = function(commandDetails) {
 BasicBolt.prototype.handleNewCommand = function(command) {
     var self = this;
     var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+
+    if (tup.task === -1 && tup.stream === "__heartbeat") {
+        self.sync();
+        return;
+    }
+
     var callback = function(err) {
-          if (err) {
-              self.fail(tup, err);
-              return;
-          }
-          self.ack(tup);
-      }
+        if (err) {
+            self.fail(tup, err);
+            return;
+        }
+        self.ack(tup);
+    }
     this.process(tup, callback);
 }
 
@@ -363,4 +370,4 @@ Spout.prototype.__emit = function(commandDetails) {
 }
 
 module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
+module.exports.Spout = Spout;
\ No newline at end of file


[22/27] storm git commit: Reducing the differences

Posted by pt...@apache.org.
Reducing the differences


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91167704
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91167704
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91167704

Branch: refs/heads/0.10.x-branch
Commit: 9116770478bcd5ee794375d8a525741298a8ff01
Parents: 0e49d91
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:58:49 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java           | 1 -
 storm-core/src/jvm/backtype/storm/tuple/Tuple.java               | 4 ++--
 storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java           | 3 +--
 .../src/jvm/storm/trident/topology/TridentBoltExecutor.java      | 2 +-
 4 files changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index eeaeeae..b253350 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -37,5 +37,4 @@ public final class MockTupleHelpers {
     when(tuple.getSourceStreamId()).thenReturn(streamId);
     return tuple;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index a31b52b..34dc61a 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -42,7 +42,7 @@ public interface Tuple extends ITuple{
      * Gets the id of the component that created this tuple.
      */
     public String getSourceComponent();
-
+    
     /**
      * Gets the id of the task that created this tuple.
      */
@@ -52,7 +52,7 @@ public interface Tuple extends ITuple{
      * Gets the id of the stream that this tuple was emitted to.
      */
     public String getSourceStreamId();
-
+    
     /**
      * Gets the message id that associated with this tuple.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7829327..818eff1 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,7 +17,6 @@
  */
 package backtype.storm.tuple;
 
-import backtype.storm.Constants;
 import backtype.storm.generated.GlobalStreamId;
 import backtype.storm.task.GeneralTopologyContext;
 import backtype.storm.utils.IndifferentAccessMap;
@@ -213,7 +212,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     public String getSourceStreamId() {
         return streamId;
     }
-
+    
     public MessageId getMessageId() {
         return id;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 41741a1..a23e555 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -300,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
     
     @Override
     public void execute(Tuple tuple) {
-        if (TupleUtils.isTick(tuple)) {
+        if(TupleUtils.isTick(tuple)) {
             long now = System.currentTimeMillis();
             if(now - _lastRotate > _messageTimeoutMs) {
                 _batches.rotate();


[05/27] storm git commit: repackage multi-lang resources

Posted by pt...@apache.org.
repackage multi-lang resources


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cb5afe32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cb5afe32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cb5afe32

Branch: refs/heads/0.10.x-branch
Commit: cb5afe3262b3cc793cc4d3a9dd2ea7b5b89e37cd
Parents: 1247f6d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 15:54:24 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400

----------------------------------------------------------------------
 pom.xml                                         |   3 +
 storm-core/pom.xml                              |  35 ++
 storm-core/src/dev/resources/storm.js           | 373 -------------------
 storm-core/src/dev/resources/storm.py           | 260 -------------
 storm-core/src/dev/resources/storm.rb           | 236 ------------
 storm-multilang/multilang-javascript/pom.xml    |  32 ++
 .../src/main/resources/resources/storm.js       | 366 ++++++++++++++++++
 storm-multilang/multilang-python/pom.xml        |  32 ++
 .../src/main/resources/resources/storm.py       | 260 +++++++++++++
 storm-multilang/multilang-ruby/pom.xml          |  32 ++
 .../src/main/resources/resources/storm.rb       | 236 ++++++++++++
 11 files changed, 996 insertions(+), 869 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 236e20f..4d7fbd2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,9 @@
     <modules>
         <module>storm-buildtools/maven-shade-clojure-transformer</module>
         <module>storm-buildtools/storm-maven-plugins</module>
+        <module>storm-multilang/multilang-javascript</module>
+        <module>storm-multilang/multilang-python</module>
+        <module>storm-multilang/multilang-ruby</module>
         <module>storm-core</module>
         <module>examples/storm-starter</module>
         <module>external/storm-kafka</module>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 17e1a15..db54481 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -35,6 +35,12 @@
     </properties>
 
     <dependencies>
+        <!-- multi-lang resources -->
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>multilang-ruby</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <!--clojure-->
         <dependency>
             <groupId>org.clojure</groupId>
@@ -510,6 +516,35 @@
                             <includeScope>runtime</includeScope>
                         </configuration>
                     </execution>
+                    <!-- multi-lang resources -->
+                               <execution>
+                                 <id>unpack</id>
+                                 <phase>process-test-resources</phase>
+                                 <goals>
+                                   <goal>unpack</goal>
+                                 </goals>
+                                 <configuration>
+                                   <artifactItems>
+                                     <artifactItem>
+                                       <groupId>org.apache.storm</groupId>
+                                       <artifactId>multilang-ruby</artifactId>
+                                       <version>${project.version}</version>
+                                     </artifactItem>
+                                     <artifactItem>
+                                        <groupId>org.apache.storm</groupId>
+                                        <artifactId>multilang-python</artifactId>
+                                        <version>${project.version}</version>
+                                      </artifactItem>
+                                      <artifactItem>
+                                         <groupId>org.apache.storm</groupId>
+                                         <artifactId>multilang-javascript</artifactId>
+                                         <version>${project.version}</version>
+                                       </artifactItem>
+                                   </artifactItems>
+                                   <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+                                 </configuration>
+                               </execution>
+
                 </executions>
             </plugin>
             <plugin>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
deleted file mode 100755
index 8827cd3..0000000
--- a/storm-core/src/dev/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
-    this.messagePart = "";
-    this.taskIdsCallbacks = [];
-    this.isFirstMessage = true;
-    this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
-    var str = JSON.stringify(msg);
-    process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
-    this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
-    var pid = process.pid;
-    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
-    this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
-    this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
-    var self = this;
-    var callback = function() {
-        self.sendPid(setupInfo['pidDir']);
-    }
-    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
-    var self = this;
-    process.stdin.on('readable', function() {
-        var chunk = process.stdin.read();
-        var messages = self.handleNewChunk(chunk);
-        messages.forEach(function(message) {
-            self.handleNewMessage(message);
-        })
-
-    });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
-    //invariant: this.messagePart has no separator otherwise we would have parsed it already
-    var messages = [];
-    if (chunk && chunk.length !== 0) {
-        //"{}".split("\nend\n")           ==> ['{}']
-        //"\nend\n".split("\nend\n")      ==> [''  , '']
-        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
-        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
-        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
-        this.messagePart = this.messagePart + chunk;
-        var newMessageParts = this.messagePart.split(this.separator);
-        while (newMessageParts.length > 0) {
-            var potentialMessage = newMessageParts.shift();
-            var anotherMessageAhead = newMessageParts.length > 0;
-            if  (!anotherMessageAhead) {
-                this.messagePart = potentialMessage;
-            }
-            else if (potentialMessage.length > 0) {
-                messages.push(potentialMessage);
-            }
-        }
-    }
-    return messages;
- }
-
-Storm.prototype.isTaskIds = function(msg) {
-    return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
-    var parsedMsg = JSON.parse(msg);
-
-    if (this.isFirstMessage) {
-        this.initSetupInfo(parsedMsg);
-        this.isFirstMessage = false;
-    } else if (this.isTaskIds(parsedMsg)) {
-        this.handleNewTaskId(parsedMsg);
-    } else {
-        this.handleNewCommand(parsedMsg);
-    }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
-    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
-    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
-    //take the first callback in the list and be sure it is the right one.
-
-    var callback = this.taskIdsCallbacks.shift();
-    if (callback) {
-        callback(taskIds);
-    } else {
-        throw new Error('Something went wrong, we off the split of task id callbacks');
-    }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
-    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
-    //through the callback (will be called when the response arrives). The callback is stored in a list until the
-    //corresponding task id list arrives.
-    if (messageDetails.task) {
-        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
-    }
-
-    if (!onTaskIds) {
-        throw new Error('You must pass a onTaskIds callback when using emit!')
-    }
-
-    this.taskIdsCallbacks.push(onTaskIds);
-    this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
-    if (!commandDetails.task) {
-        throw new Error("Emit direct must receive task id!")
-    }
-    this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
-    done();
-}
-
-Storm.prototype.run = function() {
-    process.stdout.setEncoding('utf8');
-    process.stdin.setEncoding('utf8');
-    this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
-    this.id = id;
-    this.component = component;
-    this.stream = stream;
-    this.task = task;
-    this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
-    Storm.call(this);
-    this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
-    var self = this;
-
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        stream: commandDetails.stream,
-        task: commandDetails.task,
-        anchors: [commandDetails.anchorTupleId]
-    };
-
-    this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
-    if (tup.task === -1 && tup.stream === "__heartbeat") {
-        self.sync();
-        return;
-    }
-
-    var callback = function(err) {
-          if (err) {
-              self.fail(tup, err);
-              return;
-          }
-          self.ack(tup);
-      }
-    this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
-    this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
-    this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
-    Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var callback = function() {
-        self.sync();
-    }
-
-    if (command["command"] === "next") {
-        this.nextTuple(callback);
-    }
-
-    if (command["command"] === "ack") {
-        this.ack(command["id"], callback);
-    }
-
-    if (command["command"] === "fail") {
-        this.fail(command["id"], callback);
-    }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        id: commandDetails.id,
-        stream: commandDetails.stream,
-        task: commandDetails.task
-    };
-
-    this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py
deleted file mode 100755
index 642c393..0000000
--- a/storm-core/src/dev/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
-    import simplejson as json
-except ImportError:
-    import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
-    msg = ""
-    while True:
-        line = sys.stdin.readline()
-        if not line:
-            raise Exception('Read EOF from stdin')
-        if line[0:-1] == "end":
-            break
-        msg = msg + line
-    return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
-    if pending_taskids:
-        return pending_taskids.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is not list:
-            pending_commands.append(msg)
-            msg = readMsg()
-        return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
-    if pending_commands:
-        return pending_commands.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is list:
-            pending_taskids.append(msg)
-            msg = readMsg()
-        return msg
-
-def readTuple():
-    cmd = readCommand()
-    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
-    print json_encode(msg)
-    print "end"
-    sys.stdout.flush()
-
-def sync():
-    sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
-    pid = os.getpid()
-    sendMsgToParent({'pid':pid})
-    open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
-    __emit(*args, **kwargs)
-    return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
-    kwargs["directTask"] = task
-    __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
-    global MODE
-    if MODE == Bolt:
-        emitBolt(*args, **kwargs)
-    elif MODE == Spout:
-        emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
-    global ANCHOR_TUPLE
-    if ANCHOR_TUPLE is not None:
-        anchors = [ANCHOR_TUPLE]
-    m = {"command": "emit"}
-    if stream is not None:
-        m["stream"] = stream
-    m["anchors"] = map(lambda a: a.id, anchors)
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
-    m = {"command": "emit"}
-    if id is not None:
-        m["id"] = id
-    if stream is not None:
-        m["stream"] = stream
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def ack(tup):
-    sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
-    sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
-    sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
-    sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
-    log(msg, 0)
-
-def logDebug(msg):
-    log(msg, 1)
-
-def logInfo(msg):
-    log(msg, 2)
-
-def logWarn(msg):
-    log(msg, 3)
-
-def logError(msg):
-    log(msg, 4)
-
-def rpcMetrics(name, params):
-    sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
-    setupInfo = readMsg()
-    sendpid(setupInfo['pidDir'])
-    return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
-    def __init__(self, id, component, stream, task, values):
-        self.id = id
-        self.component = component
-        self.stream = stream
-        self.task = task
-        self.values = values
-
-    def __repr__(self):
-        return '<%s%s>' % (
-            self.__class__.__name__,
-            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
-    def is_heartbeat_tuple(self):
-        return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    self.process(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        global ANCHOR_TUPLE
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    ANCHOR_TUPLE = tup
-                    try:
-                        self.process(tup)
-                        ack(tup)
-                    except Exception, e:
-                        reportError(traceback.format_exc(e))
-                        fail(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class Spout(object):
-    def initialize(self, conf, context):
-        pass
-
-    def ack(self, id):
-        pass
-
-    def fail(self, id):
-        pass
-
-    def nextTuple(self):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Spout
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                msg = readCommand()
-                if msg["command"] == "next":
-                    self.nextTuple()
-                if msg["command"] == "ack":
-                    self.ack(msg["id"])
-                if msg["command"] == "fail":
-                    self.fail(msg["id"])
-                sync()
-        except Exception, e:
-            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.rb b/storm-core/src/dev/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/storm-core/src/dev/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
-  module Protocol
-    class << self
-      attr_accessor :mode, :pending_taskids, :pending_commands
-    end
-
-    self.pending_taskids = []
-    self.pending_commands = []
-
-    def read_message
-      msg = ""
-      loop do
-        line = STDIN.readline.chomp
-        break if line == "end"
-        msg << line
-        msg << "\n"
-      end
-      JSON.parse msg.chomp
-    end
-
-    def read_task_ids
-      Storm::Protocol.pending_taskids.shift ||
-          begin
-            msg = read_message
-            until msg.is_a? Array
-              Storm::Protocol.pending_commands.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def read_command
-      Storm::Protocol.pending_commands.shift ||
-          begin
-            msg = read_message
-            while msg.is_a? Array
-              Storm::Protocol.pending_taskids.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def send_msg_to_parent(msg)
-      puts msg.to_json
-      puts "end"
-      STDOUT.flush
-    end
-
-    def sync
-      send_msg_to_parent({'command' => 'sync'})
-    end
-
-    def send_pid(heartbeat_dir)
-      pid = Process.pid
-      send_msg_to_parent({'pid' => pid})
-      File.open("#{heartbeat_dir}/#{pid}", "w").close
-    end
-
-    def emit_bolt(tup, args = {})
-      stream = args[:stream]
-      anchors = args[:anchors] || args[:anchor] || []
-      anchors = [anchors] unless anchors.is_a? Enumerable
-      direct = args[:direct_task]
-      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit_spout(tup, args = {})
-      stream = args[:stream]
-      id = args[:id]
-      direct = args[:direct_task]
-      m = {:command => :emit, :tuple => tup}
-      m[:id] = id if id
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit(*args)
-      case Storm::Protocol.mode
-        when 'spout'
-          emit_spout(*args)
-        when 'bolt'
-          emit_bolt(*args)
-      end
-    end
-
-    def ack(tup)
-      send_msg_to_parent :command => :ack, :id => tup.id
-    end
-
-    def fail(tup)
-      send_msg_to_parent :command => :fail, :id => tup.id
-    end
-
-    def reportError(msg)
-      send_msg_to_parent :command => :error, :msg => msg.to_s
-    end
-
-    def log(msg, level=2)
-      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
-    end
-
-    def logTrace(msg)
-      log(msg, 0)
-    end
-
-    def logDebug(msg)
-      log(msg, 1)
-    end
-
-    def logInfo(msg)
-      log(msg, 2)
-    end
-
-    def logWarn(msg)
-      log(msg, 3)
-    end
-
-    def logError(msg)
-      log(msg, 4)
-    end
-
-    def handshake
-      setup_info = read_message
-      send_pid setup_info['pidDir']
-      [setup_info['conf'], setup_info['context']]
-    end
-  end
-
-  class Tuple
-    attr_accessor :id, :component, :stream, :task, :values
-
-    def initialize(id, component, stream, task, values)
-      @id = id
-      @component = component
-      @stream = stream
-      @task = task
-      @values = values
-    end
-
-    def self.from_hash(hash)
-      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
-    end
-
-    def is_heartbeat
-      task == -1 and stream == '__heartbeat'
-    end
-  end
-
-  class Bolt
-    include Storm::Protocol
-
-    def prepare(conf, context); end
-
-    def process(tuple); end
-
-    def run
-      Storm::Protocol.mode = 'bolt'
-      prepare(*handshake)
-      begin
-        while true
-          tuple = Tuple.from_hash(read_command)
-          if tuple.is_heartbeat
-            sync
-          else
-            process tuple
-          end
-        end
-      rescue Exception => e
-        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-
-  class Spout
-    include Storm::Protocol
-
-    def open(conf, context); end
-
-    def nextTuple; end
-
-    def ack(id); end
-
-    def fail(id); end
-
-    def run
-      Storm::Protocol.mode = 'spout'
-      open(*handshake)
-
-      begin
-        while true
-          msg = read_command
-          case msg['command']
-            when 'next'
-              nextTuple
-            when 'ack'
-              ack(msg['id'])
-            when 'fail'
-              fail(msg['id'])
-          end
-          sync
-        end
-      rescue Exception => e
-        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-end

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/pom.xml b/storm-multilang/multilang-javascript/pom.xml
new file mode 100644
index 0000000..e1cb993
--- /dev/null
+++ b/storm-multilang/multilang-javascript/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-javascript</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-javascript</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
new file mode 100755
index 0000000..f5dcad2
--- /dev/null
+++ b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ /**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+    this.messagePart = "";
+    this.taskIdsCallbacks = [];
+    this.isFirstMessage = true;
+    this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+    var str = JSON.stringify(msg);
+    process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+    this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+    var pid = process.pid;
+    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+    this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+    this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+    var self = this;
+    var callback = function() {
+        self.sendPid(setupInfo['pidDir']);
+    }
+    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+    var self = this;
+    process.stdin.on('readable', function() {
+        var chunk = process.stdin.read();
+        var messages = self.handleNewChunk(chunk);
+        messages.forEach(function(message) {
+            self.handleNewMessage(message);
+        })
+
+    });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messagePart
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+    //invariant: this.messagePart has no separator otherwise we would have parsed it already
+    var messages = [];
+    if (chunk && chunk.length !== 0) {
+        //"{}".split("\nend\n")           ==> ['{}']
+        //"\nend\n".split("\nend\n")      ==> [''  , '']
+        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
+        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
+        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+        this.messagePart = this.messagePart + chunk;
+        var newMessageParts = this.messagePart.split(this.separator);
+        while (newMessageParts.length > 0) {
+            var potentialMessage = newMessageParts.shift();
+            var anotherMessageAhead = newMessageParts.length > 0;
+            if  (!anotherMessageAhead) {
+                this.messagePart = potentialMessage;
+            }
+            else if (potentialMessage.length > 0) {
+                messages.push(potentialMessage);
+            }
+        }
+    }
+    return messages;
+ }
+
+Storm.prototype.isTaskIds = function(msg) {
+    return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+    var parsedMsg = JSON.parse(msg);
+
+    if (this.isFirstMessage) {
+        this.initSetupInfo(parsedMsg);
+        this.isFirstMessage = false;
+    } else if (this.isTaskIds(parsedMsg)) {
+        this.handleNewTaskId(parsedMsg);
+    } else {
+        this.handleNewCommand(parsedMsg);
+    }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
+    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
+    //take the first callback in the list and be sure it is the right one.
+
+    var callback = this.taskIdsCallbacks.shift();
+    if (callback) {
+        callback(taskIds);
+    } else {
+        throw new Error('Something went wrong, we off the split of task id callbacks');
+    }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
+    //through the callback (will be called when the response arrives). The callback is stored in a list until the
+    //corresponding task id list arrives.
+    if (messageDetails.task) {
+        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+    }
+
+    if (!onTaskIds) {
+        throw new Error('You must pass a onTaskIds callback when using emit!')
+    }
+
+    this.taskIdsCallbacks.push(onTaskIds);
+    this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+    if (!commandDetails.task) {
+        throw new Error("Emit direct must receive task id!")
+    }
+    this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+    done();
+}
+
+Storm.prototype.run = function() {
+    process.stdout.setEncoding('utf8');
+    process.stdin.setEncoding('utf8');
+    this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+    this.id = id;
+    this.component = component;
+    this.stream = stream;
+    this.task = task;
+    this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+    Storm.call(this);
+    this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+    var self = this;
+
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        stream: commandDetails.stream,
+        task: commandDetails.task,
+        anchors: [commandDetails.anchorTupleId]
+    };
+
+    this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+    var callback = function(err) {
+          if (err) {
+              self.fail(tup, err);
+              return;
+          }
+          self.ack(tup);
+      }
+    this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+    this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+    this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+    Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var callback = function() {
+        self.sync();
+    }
+
+    if (command["command"] === "next") {
+        this.nextTuple(callback);
+    }
+
+    if (command["command"] === "ack") {
+        this.ack(command["id"], callback);
+    }
+
+    if (command["command"] === "fail") {
+        this.fail(command["id"], callback);
+    }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        id: commandDetails.id,
+        stream: commandDetails.stream,
+        task: commandDetails.task
+    };
+
+    this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/pom.xml b/storm-multilang/multilang-python/pom.xml
new file mode 100644
index 0000000..379c0bc
--- /dev/null
+++ b/storm-multilang/multilang-python/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-python</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-python</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-python/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/src/main/resources/resources/storm.py b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
new file mode 100755
index 0000000..642c393
--- /dev/null
+++ b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
@@ -0,0 +1,260 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import os
+import traceback
+from collections import deque
+
+try:
+    import simplejson as json
+except ImportError:
+    import json
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
+
+#reads lines and reconstructs newlines appropriately
+def readMsg():
+    msg = ""
+    while True:
+        line = sys.stdin.readline()
+        if not line:
+            raise Exception('Read EOF from stdin')
+        if line[0:-1] == "end":
+            break
+        msg = msg + line
+    return json_decode(msg[0:-1])
+
+MODE = None
+ANCHOR_TUPLE = None
+
+#queue up commands we read while trying to read taskids
+pending_commands = deque()
+
+def readTaskIds():
+    if pending_taskids:
+        return pending_taskids.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is not list:
+            pending_commands.append(msg)
+            msg = readMsg()
+        return msg
+
+#queue up taskids we read while trying to read commands/tuples
+pending_taskids = deque()
+
+def readCommand():
+    if pending_commands:
+        return pending_commands.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is list:
+            pending_taskids.append(msg)
+            msg = readMsg()
+        return msg
+
+def readTuple():
+    cmd = readCommand()
+    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
+def sendMsgToParent(msg):
+    print json_encode(msg)
+    print "end"
+    sys.stdout.flush()
+
+def sync():
+    sendMsgToParent({'command':'sync'})
+
+def sendpid(heartbeatdir):
+    pid = os.getpid()
+    sendMsgToParent({'pid':pid})
+    open(heartbeatdir + "/" + str(pid), "w").close()
+
+def emit(*args, **kwargs):
+    __emit(*args, **kwargs)
+    return readTaskIds()
+
+def emitDirect(task, *args, **kwargs):
+    kwargs["directTask"] = task
+    __emit(*args, **kwargs)
+
+def __emit(*args, **kwargs):
+    global MODE
+    if MODE == Bolt:
+        emitBolt(*args, **kwargs)
+    elif MODE == Spout:
+        emitSpout(*args, **kwargs)
+
+def emitBolt(tup, stream=None, anchors = [], directTask=None):
+    global ANCHOR_TUPLE
+    if ANCHOR_TUPLE is not None:
+        anchors = [ANCHOR_TUPLE]
+    m = {"command": "emit"}
+    if stream is not None:
+        m["stream"] = stream
+    m["anchors"] = map(lambda a: a.id, anchors)
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def emitSpout(tup, stream=None, id=None, directTask=None):
+    m = {"command": "emit"}
+    if id is not None:
+        m["id"] = id
+    if stream is not None:
+        m["stream"] = stream
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def ack(tup):
+    sendMsgToParent({"command": "ack", "id": tup.id})
+
+def fail(tup):
+    sendMsgToParent({"command": "fail", "id": tup.id})
+
+def reportError(msg):
+    sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+    sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+    log(msg, 0)
+
+def logDebug(msg):
+    log(msg, 1)
+
+def logInfo(msg):
+    log(msg, 2)
+
+def logWarn(msg):
+    log(msg, 3)
+
+def logError(msg):
+    log(msg, 4)
+
+def rpcMetrics(name, params):
+    sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
+def initComponent():
+    setupInfo = readMsg()
+    sendpid(setupInfo['pidDir'])
+    return [setupInfo['conf'], setupInfo['context']]
+
+class Tuple(object):
+    def __init__(self, id, component, stream, task, values):
+        self.id = id
+        self.component = component
+        self.stream = stream
+        self.task = task
+        self.values = values
+
+    def __repr__(self):
+        return '<%s%s>' % (
+            self.__class__.__name__,
+            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
+
+    def is_heartbeat_tuple(self):
+        return self.task == -1 and self.stream == "__heartbeat"
+
+class Bolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    self.process(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class BasicBolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        global ANCHOR_TUPLE
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    ANCHOR_TUPLE = tup
+                    try:
+                        self.process(tup)
+                        ack(tup)
+                    except Exception, e:
+                        reportError(traceback.format_exc(e))
+                        fail(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class Spout(object):
+    def initialize(self, conf, context):
+        pass
+
+    def ack(self, id):
+        pass
+
+    def fail(self, id):
+        pass
+
+    def nextTuple(self):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Spout
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                msg = readCommand()
+                if msg["command"] == "next":
+                    self.nextTuple()
+                if msg["command"] == "ack":
+                    self.ack(msg["id"])
+                if msg["command"] == "fail":
+                    self.fail(msg["id"])
+                sync()
+        except Exception, e:
+            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/pom.xml b/storm-multilang/multilang-ruby/pom.xml
new file mode 100644
index 0000000..6b5dd0c
--- /dev/null
+++ b/storm-multilang/multilang-ruby/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-ruby</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-ruby</name>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
new file mode 100644
index 0000000..816694e
--- /dev/null
+++ b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
@@ -0,0 +1,236 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require "rubygems"
+require "json"
+
+module Storm
+  module Protocol
+    class << self
+      attr_accessor :mode, :pending_taskids, :pending_commands
+    end
+
+    self.pending_taskids = []
+    self.pending_commands = []
+
+    def read_message
+      msg = ""
+      loop do
+        line = STDIN.readline.chomp
+        break if line == "end"
+        msg << line
+        msg << "\n"
+      end
+      JSON.parse msg.chomp
+    end
+
+    def read_task_ids
+      Storm::Protocol.pending_taskids.shift ||
+          begin
+            msg = read_message
+            until msg.is_a? Array
+              Storm::Protocol.pending_commands.push(msg)
+              msg = read_message
+            end
+            msg
+          end
+    end
+
+    def read_command
+      Storm::Protocol.pending_commands.shift ||
+          begin
+            msg = read_message
+            while msg.is_a? Array
+              Storm::Protocol.pending_taskids.push(msg)
+              msg = read_message
+            end
+            msg
+          end
+    end
+
+    def send_msg_to_parent(msg)
+      puts msg.to_json
+      puts "end"
+      STDOUT.flush
+    end
+
+    def sync
+      send_msg_to_parent({'command' => 'sync'})
+    end
+
+    def send_pid(heartbeat_dir)
+      pid = Process.pid
+      send_msg_to_parent({'pid' => pid})
+      File.open("#{heartbeat_dir}/#{pid}", "w").close
+    end
+
+    def emit_bolt(tup, args = {})
+      stream = args[:stream]
+      anchors = args[:anchors] || args[:anchor] || []
+      anchors = [anchors] unless anchors.is_a? Enumerable
+      direct = args[:direct_task]
+      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit_spout(tup, args = {})
+      stream = args[:stream]
+      id = args[:id]
+      direct = args[:direct_task]
+      m = {:command => :emit, :tuple => tup}
+      m[:id] = id if id
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit(*args)
+      case Storm::Protocol.mode
+        when 'spout'
+          emit_spout(*args)
+        when 'bolt'
+          emit_bolt(*args)
+      end
+    end
+
+    def ack(tup)
+      send_msg_to_parent :command => :ack, :id => tup.id
+    end
+
+    def fail(tup)
+      send_msg_to_parent :command => :fail, :id => tup.id
+    end
+
+    def reportError(msg)
+      send_msg_to_parent :command => :error, :msg => msg.to_s
+    end
+
+    def log(msg, level=2)
+      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+    end
+
+    def logTrace(msg)
+      log(msg, 0)
+    end
+
+    def logDebug(msg)
+      log(msg, 1)
+    end
+
+    def logInfo(msg)
+      log(msg, 2)
+    end
+
+    def logWarn(msg)
+      log(msg, 3)
+    end
+
+    def logError(msg)
+      log(msg, 4)
+    end
+
+    def handshake
+      setup_info = read_message
+      send_pid setup_info['pidDir']
+      [setup_info['conf'], setup_info['context']]
+    end
+  end
+
+  class Tuple
+    attr_accessor :id, :component, :stream, :task, :values
+
+    def initialize(id, component, stream, task, values)
+      @id = id
+      @component = component
+      @stream = stream
+      @task = task
+      @values = values
+    end
+
+    def self.from_hash(hash)
+      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+    end
+
+    def is_heartbeat
+      task == -1 and stream == '__heartbeat'
+    end
+  end
+
+  class Bolt
+    include Storm::Protocol
+
+    def prepare(conf, context); end
+
+    def process(tuple); end
+
+    def run
+      Storm::Protocol.mode = 'bolt'
+      prepare(*handshake)
+      begin
+        while true
+          tuple = Tuple.from_hash(read_command)
+          if tuple.is_heartbeat
+            sync
+          else
+            process tuple
+          end
+        end
+      rescue Exception => e
+        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+
+  class Spout
+    include Storm::Protocol
+
+    def open(conf, context); end
+
+    def nextTuple; end
+
+    def ack(id); end
+
+    def fail(id); end
+
+    def run
+      Storm::Protocol.mode = 'spout'
+      open(*handshake)
+
+      begin
+        while true
+          msg = read_command
+          case msg['command']
+            when 'next'
+              nextTuple
+            when 'ack'
+              ack(msg['id'])
+            when 'fail'
+              fail(msg['id'])
+          end
+          sync
+        end
+      rescue Exception => e
+        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+end


[18/27] storm git commit: Resolve NPE that can occur if there is no SourceComponent in a Tuple

Posted by pt...@apache.org.
Resolve NPE that can occur if there is no SourceComponent in a Tuple


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5faa782a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5faa782a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5faa782a

Branch: refs/heads/0.10.x-branch
Commit: 5faa782ab2c8732579f1693d74c8d1e1722c3942
Parents: 63f331f
Author: Niels Basjes <ni...@basjes.nl>
Authored: Thu Dec 11 12:35:01 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/5faa782a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7ff2c8c..40ad11c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -215,8 +215,8 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
     }
 
     public boolean isTick() {
-        return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
-               this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+        return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
+               Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
     }
 
     public MessageId getMessageId() {


[23/27] storm git commit: Code cleanup in mocking storm-starter tests

Posted by pt...@apache.org.
Code cleanup in mocking storm-starter tests


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22a29239
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22a29239
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22a29239

Branch: refs/heads/0.10.x-branch
Commit: 22a2923979a70b6384d72ab510c5d1bcd8de274f
Parents: 027db67
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:56:11 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400

----------------------------------------------------------------------
 .../test/jvm/storm/starter/tools/MockTupleHelpers.java           | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/22a29239/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 3180fd3..374288e 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -29,9 +29,7 @@ public final class MockTupleHelpers {
   }
 
   public static Tuple mockTickTuple() {
-    Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
-    when(tuple.isTick()).thenReturn(true);
-    return tuple;
+    return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
   }
 
   public static Tuple mockTuple(String componentId, String streamId) {


[19/27] storm git commit: Refactor to move the isTick to a utility class

Posted by pt...@apache.org.
Refactor to move the isTick to a utility class


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e49d91b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e49d91b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e49d91b

Branch: refs/heads/0.10.x-branch
Commit: 0e49d91be5b765eccf3e05fe9c44f53762f17ddf
Parents: 5faa782
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:56:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400

----------------------------------------------------------------------
 .../storm/starter/bolt/AbstractRankerBolt.java  |  3 +-
 .../storm/starter/bolt/RollingCountBolt.java    |  3 +-
 .../storm/starter/tools/MockTupleHelpers.java   |  6 ----
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  4 +--
 .../src/jvm/backtype/storm/tuple/Tuple.java     |  5 ---
 .../src/jvm/backtype/storm/tuple/TupleImpl.java |  5 ---
 .../jvm/backtype/storm/utils/TupleUtils.java    | 35 ++++++++++++++++++++
 .../trident/topology/TridentBoltExecutor.java   |  3 +-
 8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 83c2cfc..64ceb29 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseBasicBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import org.apache.log4j.Logger;
 import storm.starter.tools.Rankings;
 
@@ -77,7 +78,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
    */
   @Override
   public final void execute(Tuple tuple, BasicOutputCollector collector) {
-    if (tuple.isTick()) {
+    if (TupleUtils.isTick(tuple)) {
       getLogger().debug("Received tick tuple, triggering emit of current rankings");
       emitRankings(collector);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f023c0b..31f7ee2 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
 import org.apache.log4j.Logger;
 import storm.starter.tools.NthLastModifiedTimeTracker;
 import storm.starter.tools.SlidingWindowCounter;
@@ -94,7 +95,7 @@ public class RollingCountBolt extends BaseRichBolt {
 
   @Override
   public void execute(Tuple tuple) {
-    if (tuple.isTick()) {
+    if (TupleUtils.isTick(tuple)) {
       LOG.debug("Received tick tuple, triggering emit of current window counts");
       emitCurrentWindowCounts();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 9e8629c..eeaeeae 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -35,13 +35,7 @@ public final class MockTupleHelpers {
     Tuple tuple = mock(Tuple.class);
     when(tuple.getSourceComponent()).thenReturn(componentId);
     when(tuple.getSourceStreamId()).thenReturn(streamId);
-    when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
     return tuple;
   }
 
-  private static boolean isTick(String componentId, String streamId) {
-    return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
-           streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 0a1e5fe..a8c4321 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
 import kafka.javaapi.producer.Producer;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
@@ -89,8 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
 
     @Override
     public void execute(Tuple input) {
-        if (input.isTick()) {
-          collector.ack(input);
+        if (TupleUtils.isTick(input)) {
           return; // Do not try to send ticks to Kafka
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 7ea93b9..a31b52b 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -54,11 +54,6 @@ public interface Tuple extends ITuple{
     public String getSourceStreamId();
 
     /**
-     * Returns if this tuple is a tick tuple or not.
-     */
-    public boolean isTick();
-
-    /**
      * Gets the message id that associated with this tuple.
      */
     public MessageId getMessageId();

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 40ad11c..7829327 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -214,11 +214,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
         return streamId;
     }
 
-    public boolean isTick() {
-        return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
-               Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
-    }
-
     public MessageId getMessageId() {
         return id;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
new file mode 100644
index 0000000..f9fb2c0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+
+public final class TupleUtils {
+
+  private TupleUtils() {
+    // No instantiation
+  }
+
+  public static boolean isTick(Tuple tuple) {
+    return tuple != null
+           && Constants.SYSTEM_COMPONENT_ID  .equals(tuple.getSourceComponent())
+           && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index da4c1a5..41741a1 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -34,6 +34,7 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import backtype.storm.utils.RotatingMap;
+import backtype.storm.utils.TupleUtils;
 import backtype.storm.utils.Utils;
 import java.io.Serializable;
 import java.util.Arrays;
@@ -299,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
     
     @Override
     public void execute(Tuple tuple) {
-        if(tuple.isTick()) {
+        if (TupleUtils.isTick(tuple)) {
             long now = System.currentTimeMillis();
             if(now - _lastRotate > _messageTimeoutMs) {
                 _batches.rotate();


[08/27] storm git commit: add additional information to error message when ShellBolt dies

Posted by pt...@apache.org.
add additional information to error message when ShellBolt dies


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9e2c8166
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9e2c8166
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9e2c8166

Branch: refs/heads/0.10.x-branch
Commit: 9e2c8166fa8f56c60b366044d23e62a3e206f009
Parents: 17c1368
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:03:15 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400

----------------------------------------------------------------------
 storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9e2c8166/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 308ec67..eac8a90 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -279,7 +279,10 @@ public class ShellBolt implements IBolt {
     private void die(Throwable exception) {
         String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
         _exception = new RuntimeException(processInfo, exception);
-        LOG.error("Halting process: ShellBolt died.", exception);
+        String message = String.format("Halting process: ShellBolt died. Command: %s, ProcessInfo %s",
+                Arrays.toString(_command),
+                processInfo);
+        LOG.error(message, exception);
         _collector.reportError(exception);
         if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
             System.exit(11);


[04/27] storm git commit: rename multilang component modules

Posted by pt...@apache.org.
rename multilang component modules


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/927132ce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/927132ce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/927132ce

Branch: refs/heads/0.10.x-branch
Commit: 927132ceff2b8354d75909845c3f09be4c41f6d0
Parents: 3c70141
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:25:10 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400

----------------------------------------------------------------------
 pom.xml                                         |   6 +-
 storm-multilang/javascript/pom.xml              |  32 ++
 .../src/main/resources/resources/storm.js       | 373 +++++++++++++++++++
 storm-multilang/multilang-javascript/pom.xml    |  32 --
 .../src/main/resources/resources/storm.js       | 373 -------------------
 storm-multilang/multilang-python/pom.xml        |  32 --
 .../src/main/resources/resources/storm.py       | 260 -------------
 storm-multilang/multilang-ruby/pom.xml          |  32 --
 .../src/main/resources/resources/storm.rb       | 236 ------------
 storm-multilang/python/pom.xml                  |  32 ++
 .../src/main/resources/resources/storm.py       | 260 +++++++++++++
 storm-multilang/ruby/pom.xml                    |  32 ++
 .../ruby/src/main/resources/resources/storm.rb  | 236 ++++++++++++
 13 files changed, 968 insertions(+), 968 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4d7fbd2..c5a0b24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,9 +157,9 @@
     <modules>
         <module>storm-buildtools/maven-shade-clojure-transformer</module>
         <module>storm-buildtools/storm-maven-plugins</module>
-        <module>storm-multilang/multilang-javascript</module>
-        <module>storm-multilang/multilang-python</module>
-        <module>storm-multilang/multilang-ruby</module>
+        <module>storm-multilang/javascript</module>
+        <module>storm-multilang/python</module>
+        <module>storm-multilang/ruby</module>
         <module>storm-core</module>
         <module>examples/storm-starter</module>
         <module>external/storm-kafka</module>

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/javascript/pom.xml b/storm-multilang/javascript/pom.xml
new file mode 100644
index 0000000..e1cb993
--- /dev/null
+++ b/storm-multilang/javascript/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-javascript</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-javascript</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/javascript/src/main/resources/resources/storm.js b/storm-multilang/javascript/src/main/resources/resources/storm.js
new file mode 100755
index 0000000..dc6efc1
--- /dev/null
+++ b/storm-multilang/javascript/src/main/resources/resources/storm.js
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+    this.messagePart = "";
+    this.taskIdsCallbacks = [];
+    this.isFirstMessage = true;
+    this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+    var str = JSON.stringify(msg);
+    process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+    this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+    var pid = process.pid;
+    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+    this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+    this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+    var self = this;
+    var callback = function() {
+        self.sendPid(setupInfo['pidDir']);
+    }
+    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+    var self = this;
+    process.stdin.on('readable', function() {
+        var chunk = process.stdin.read();
+        var messages = self.handleNewChunk(chunk);
+        messages.forEach(function(message) {
+            self.handleNewMessage(message);
+        })
+
+    });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messagePart
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+    //invariant: this.messagePart has no separator otherwise we would have parsed it already
+    var messages = [];
+    if (chunk && chunk.length !== 0) {
+        //"{}".split("\nend\n")           ==> ['{}']
+        //"\nend\n".split("\nend\n")      ==> [''  , '']
+        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
+        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
+        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+        this.messagePart = this.messagePart + chunk;
+        var newMessageParts = this.messagePart.split(this.separator);
+        while (newMessageParts.length > 0) {
+            var potentialMessage = newMessageParts.shift();
+            var anotherMessageAhead = newMessageParts.length > 0;
+            if  (!anotherMessageAhead) {
+                this.messagePart = potentialMessage;
+            }
+            else if (potentialMessage.length > 0) {
+                messages.push(potentialMessage);
+            }
+        }
+    }
+    return messages;
+}
+
+Storm.prototype.isTaskIds = function(msg) {
+    return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+    var parsedMsg = JSON.parse(msg);
+
+    if (this.isFirstMessage) {
+        this.initSetupInfo(parsedMsg);
+        this.isFirstMessage = false;
+    } else if (this.isTaskIds(parsedMsg)) {
+        this.handleNewTaskId(parsedMsg);
+    } else {
+        this.handleNewCommand(parsedMsg);
+    }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
+    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
+    //take the first callback in the list and be sure it is the right one.
+
+    var callback = this.taskIdsCallbacks.shift();
+    if (callback) {
+        callback(taskIds);
+    } else {
+        throw new Error('Something went wrong, we off the split of task id callbacks');
+    }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
+    //through the callback (will be called when the response arrives). The callback is stored in a list until the
+    //corresponding task id list arrives.
+    if (messageDetails.task) {
+        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+    }
+
+    if (!onTaskIds) {
+        throw new Error('You must pass a onTaskIds callback when using emit!')
+    }
+
+    this.taskIdsCallbacks.push(onTaskIds);
+    this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+    if (!commandDetails.task) {
+        throw new Error("Emit direct must receive task id!")
+    }
+    this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+    done();
+}
+
+Storm.prototype.run = function() {
+    process.stdout.setEncoding('utf8');
+    process.stdin.setEncoding('utf8');
+    this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+    this.id = id;
+    this.component = component;
+    this.stream = stream;
+    this.task = task;
+    this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+    Storm.call(this);
+    this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+    var self = this;
+
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        stream: commandDetails.stream,
+        task: commandDetails.task,
+        anchors: [commandDetails.anchorTupleId]
+    };
+
+    this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+
+    if (tup.task === -1 && tup.stream === "__heartbeat") {
+        self.sync();
+        return;
+    }
+
+    var callback = function(err) {
+        if (err) {
+            self.fail(tup, err);
+            return;
+        }
+        self.ack(tup);
+    }
+    this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+    this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+    this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+    Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+    var self = this;
+    var callback = function() {
+        self.sync();
+    }
+
+    if (command["command"] === "next") {
+        this.nextTuple(callback);
+    }
+
+    if (command["command"] === "ack") {
+        this.ack(command["id"], callback);
+    }
+
+    if (command["command"] === "fail") {
+        this.fail(command["id"], callback);
+    }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+    var message = {
+        command: "emit",
+        tuple: commandDetails.tuple,
+        id: commandDetails.id,
+        stream: commandDetails.stream,
+        task: commandDetails.task
+    };
+
+    this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/pom.xml b/storm-multilang/multilang-javascript/pom.xml
deleted file mode 100644
index e1cb993..0000000
--- a/storm-multilang/multilang-javascript/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    <groupId>org.apache.storm</groupId>
-    <artifactId>multilang-javascript</artifactId>
-    <packaging>jar</packaging>
-    <name>multilang-javascript</name>
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
deleted file mode 100755
index dc6efc1..0000000
--- a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
-    this.messagePart = "";
-    this.taskIdsCallbacks = [];
-    this.isFirstMessage = true;
-    this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
-    var str = JSON.stringify(msg);
-    process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
-    this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
-    var pid = process.pid;
-    fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
-    this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
-    this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
-    var self = this;
-    var callback = function() {
-        self.sendPid(setupInfo['pidDir']);
-    }
-    this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
-    var self = this;
-    process.stdin.on('readable', function() {
-        var chunk = process.stdin.read();
-        var messages = self.handleNewChunk(chunk);
-        messages.forEach(function(message) {
-            self.handleNewMessage(message);
-        })
-
-    });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
-    //invariant: this.messagePart has no separator otherwise we would have parsed it already
-    var messages = [];
-    if (chunk && chunk.length !== 0) {
-        //"{}".split("\nend\n")           ==> ['{}']
-        //"\nend\n".split("\nend\n")      ==> [''  , '']
-        //"{}\nend\n".split("\nend\n")    ==> ['{}', '']
-        //"\nend\n{}".split("\nend\n")    ==> [''  , '{}']
-        // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
-        this.messagePart = this.messagePart + chunk;
-        var newMessageParts = this.messagePart.split(this.separator);
-        while (newMessageParts.length > 0) {
-            var potentialMessage = newMessageParts.shift();
-            var anotherMessageAhead = newMessageParts.length > 0;
-            if  (!anotherMessageAhead) {
-                this.messagePart = potentialMessage;
-            }
-            else if (potentialMessage.length > 0) {
-                messages.push(potentialMessage);
-            }
-        }
-    }
-    return messages;
-}
-
-Storm.prototype.isTaskIds = function(msg) {
-    return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
-    var parsedMsg = JSON.parse(msg);
-
-    if (this.isFirstMessage) {
-        this.initSetupInfo(parsedMsg);
-        this.isFirstMessage = false;
-    } else if (this.isTaskIds(parsedMsg)) {
-        this.handleNewTaskId(parsedMsg);
-    } else {
-        this.handleNewCommand(parsedMsg);
-    }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
-    //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
-    //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
-    //take the first callback in the list and be sure it is the right one.
-
-    var callback = this.taskIdsCallbacks.shift();
-    if (callback) {
-        callback(taskIds);
-    } else {
-        throw new Error('Something went wrong, we off the split of task id callbacks');
-    }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
-    //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
-    //through the callback (will be called when the response arrives). The callback is stored in a list until the
-    //corresponding task id list arrives.
-    if (messageDetails.task) {
-        throw new Error('Illegal input - task. To emit to specific task use emit direct!');
-    }
-
-    if (!onTaskIds) {
-        throw new Error('You must pass a onTaskIds callback when using emit!')
-    }
-
-    this.taskIdsCallbacks.push(onTaskIds);
-    this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
-    if (!commandDetails.task) {
-        throw new Error("Emit direct must receive task id!")
-    }
-    this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
-    done();
-}
-
-Storm.prototype.run = function() {
-    process.stdout.setEncoding('utf8');
-    process.stdin.setEncoding('utf8');
-    this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
-    this.id = id;
-    this.component = component;
-    this.stream = stream;
-    this.task = task;
-    this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
-    Storm.call(this);
-    this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
-    var self = this;
-
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        stream: commandDetails.stream,
-        task: commandDetails.task,
-        anchors: [commandDetails.anchorTupleId]
-    };
-
-    this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
-    if (tup.task === -1 && tup.stream === "__heartbeat") {
-        self.sync();
-        return;
-    }
-
-    var callback = function(err) {
-        if (err) {
-            self.fail(tup, err);
-            return;
-        }
-        self.ack(tup);
-    }
-    this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
-    this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
-    this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
-    Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
-    var self = this;
-    var callback = function() {
-        self.sync();
-    }
-
-    if (command["command"] === "next") {
-        this.nextTuple(callback);
-    }
-
-    if (command["command"] === "ack") {
-        this.ack(command["id"], callback);
-    }
-
-    if (command["command"] === "fail") {
-        this.fail(command["id"], callback);
-    }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
-    var message = {
-        command: "emit",
-        tuple: commandDetails.tuple,
-        id: commandDetails.id,
-        stream: commandDetails.stream,
-        task: commandDetails.task
-    };
-
-    this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/pom.xml b/storm-multilang/multilang-python/pom.xml
deleted file mode 100644
index 379c0bc..0000000
--- a/storm-multilang/multilang-python/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    <groupId>org.apache.storm</groupId>
-    <artifactId>multilang-python</artifactId>
-    <packaging>jar</packaging>
-    <name>multilang-python</name>
-
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-python/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/src/main/resources/resources/storm.py b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
deleted file mode 100755
index 642c393..0000000
--- a/storm-multilang/multilang-python/src/main/resources/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
-    import simplejson as json
-except ImportError:
-    import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
-    msg = ""
-    while True:
-        line = sys.stdin.readline()
-        if not line:
-            raise Exception('Read EOF from stdin')
-        if line[0:-1] == "end":
-            break
-        msg = msg + line
-    return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
-    if pending_taskids:
-        return pending_taskids.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is not list:
-            pending_commands.append(msg)
-            msg = readMsg()
-        return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
-    if pending_commands:
-        return pending_commands.popleft()
-    else:
-        msg = readMsg()
-        while type(msg) is list:
-            pending_taskids.append(msg)
-            msg = readMsg()
-        return msg
-
-def readTuple():
-    cmd = readCommand()
-    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
-    print json_encode(msg)
-    print "end"
-    sys.stdout.flush()
-
-def sync():
-    sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
-    pid = os.getpid()
-    sendMsgToParent({'pid':pid})
-    open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
-    __emit(*args, **kwargs)
-    return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
-    kwargs["directTask"] = task
-    __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
-    global MODE
-    if MODE == Bolt:
-        emitBolt(*args, **kwargs)
-    elif MODE == Spout:
-        emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
-    global ANCHOR_TUPLE
-    if ANCHOR_TUPLE is not None:
-        anchors = [ANCHOR_TUPLE]
-    m = {"command": "emit"}
-    if stream is not None:
-        m["stream"] = stream
-    m["anchors"] = map(lambda a: a.id, anchors)
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
-    m = {"command": "emit"}
-    if id is not None:
-        m["id"] = id
-    if stream is not None:
-        m["stream"] = stream
-    if directTask is not None:
-        m["task"] = directTask
-    m["tuple"] = tup
-    sendMsgToParent(m)
-
-def ack(tup):
-    sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
-    sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
-    sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
-    sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
-    log(msg, 0)
-
-def logDebug(msg):
-    log(msg, 1)
-
-def logInfo(msg):
-    log(msg, 2)
-
-def logWarn(msg):
-    log(msg, 3)
-
-def logError(msg):
-    log(msg, 4)
-
-def rpcMetrics(name, params):
-    sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
-    setupInfo = readMsg()
-    sendpid(setupInfo['pidDir'])
-    return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
-    def __init__(self, id, component, stream, task, values):
-        self.id = id
-        self.component = component
-        self.stream = stream
-        self.task = task
-        self.values = values
-
-    def __repr__(self):
-        return '<%s%s>' % (
-            self.__class__.__name__,
-            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
-    def is_heartbeat_tuple(self):
-        return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    self.process(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
-    def initialize(self, stormconf, context):
-        pass
-
-    def process(self, tuple):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Bolt
-        global ANCHOR_TUPLE
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                tup = readTuple()
-                if tup.is_heartbeat_tuple():
-                    sync()
-                else:
-                    ANCHOR_TUPLE = tup
-                    try:
-                        self.process(tup)
-                        ack(tup)
-                    except Exception, e:
-                        reportError(traceback.format_exc(e))
-                        fail(tup)
-        except Exception, e:
-            reportError(traceback.format_exc(e))
-
-class Spout(object):
-    def initialize(self, conf, context):
-        pass
-
-    def ack(self, id):
-        pass
-
-    def fail(self, id):
-        pass
-
-    def nextTuple(self):
-        pass
-
-    def run(self):
-        global MODE
-        MODE = Spout
-        conf, context = initComponent()
-        try:
-            self.initialize(conf, context)
-            while True:
-                msg = readCommand()
-                if msg["command"] == "next":
-                    self.nextTuple()
-                if msg["command"] == "ack":
-                    self.ack(msg["id"])
-                if msg["command"] == "fail":
-                    self.fail(msg["id"])
-                sync()
-        except Exception, e:
-            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/pom.xml b/storm-multilang/multilang-ruby/pom.xml
deleted file mode 100644
index 6b5dd0c..0000000
--- a/storm-multilang/multilang-ruby/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements.  See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License.  You may obtain a copy of the License at
-
-     http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <artifactId>storm</artifactId>
-        <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
-        <relativePath>../../pom.xml</relativePath>
-    </parent>
-    <groupId>org.apache.storm</groupId>
-    <artifactId>multilang-ruby</artifactId>
-    <packaging>jar</packaging>
-    <name>multilang-ruby</name>
-    
-</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
-  module Protocol
-    class << self
-      attr_accessor :mode, :pending_taskids, :pending_commands
-    end
-
-    self.pending_taskids = []
-    self.pending_commands = []
-
-    def read_message
-      msg = ""
-      loop do
-        line = STDIN.readline.chomp
-        break if line == "end"
-        msg << line
-        msg << "\n"
-      end
-      JSON.parse msg.chomp
-    end
-
-    def read_task_ids
-      Storm::Protocol.pending_taskids.shift ||
-          begin
-            msg = read_message
-            until msg.is_a? Array
-              Storm::Protocol.pending_commands.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def read_command
-      Storm::Protocol.pending_commands.shift ||
-          begin
-            msg = read_message
-            while msg.is_a? Array
-              Storm::Protocol.pending_taskids.push(msg)
-              msg = read_message
-            end
-            msg
-          end
-    end
-
-    def send_msg_to_parent(msg)
-      puts msg.to_json
-      puts "end"
-      STDOUT.flush
-    end
-
-    def sync
-      send_msg_to_parent({'command' => 'sync'})
-    end
-
-    def send_pid(heartbeat_dir)
-      pid = Process.pid
-      send_msg_to_parent({'pid' => pid})
-      File.open("#{heartbeat_dir}/#{pid}", "w").close
-    end
-
-    def emit_bolt(tup, args = {})
-      stream = args[:stream]
-      anchors = args[:anchors] || args[:anchor] || []
-      anchors = [anchors] unless anchors.is_a? Enumerable
-      direct = args[:direct_task]
-      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit_spout(tup, args = {})
-      stream = args[:stream]
-      id = args[:id]
-      direct = args[:direct_task]
-      m = {:command => :emit, :tuple => tup}
-      m[:id] = id if id
-      m[:stream] = stream if stream
-      m[:task] = direct if direct
-      send_msg_to_parent m
-      read_task_ids unless direct
-    end
-
-    def emit(*args)
-      case Storm::Protocol.mode
-        when 'spout'
-          emit_spout(*args)
-        when 'bolt'
-          emit_bolt(*args)
-      end
-    end
-
-    def ack(tup)
-      send_msg_to_parent :command => :ack, :id => tup.id
-    end
-
-    def fail(tup)
-      send_msg_to_parent :command => :fail, :id => tup.id
-    end
-
-    def reportError(msg)
-      send_msg_to_parent :command => :error, :msg => msg.to_s
-    end
-
-    def log(msg, level=2)
-      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
-    end
-
-    def logTrace(msg)
-      log(msg, 0)
-    end
-
-    def logDebug(msg)
-      log(msg, 1)
-    end
-
-    def logInfo(msg)
-      log(msg, 2)
-    end
-
-    def logWarn(msg)
-      log(msg, 3)
-    end
-
-    def logError(msg)
-      log(msg, 4)
-    end
-
-    def handshake
-      setup_info = read_message
-      send_pid setup_info['pidDir']
-      [setup_info['conf'], setup_info['context']]
-    end
-  end
-
-  class Tuple
-    attr_accessor :id, :component, :stream, :task, :values
-
-    def initialize(id, component, stream, task, values)
-      @id = id
-      @component = component
-      @stream = stream
-      @task = task
-      @values = values
-    end
-
-    def self.from_hash(hash)
-      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
-    end
-
-    def is_heartbeat
-      task == -1 and stream == '__heartbeat'
-    end
-  end
-
-  class Bolt
-    include Storm::Protocol
-
-    def prepare(conf, context); end
-
-    def process(tuple); end
-
-    def run
-      Storm::Protocol.mode = 'bolt'
-      prepare(*handshake)
-      begin
-        while true
-          tuple = Tuple.from_hash(read_command)
-          if tuple.is_heartbeat
-            sync
-          else
-            process tuple
-          end
-        end
-      rescue Exception => e
-        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-
-  class Spout
-    include Storm::Protocol
-
-    def open(conf, context); end
-
-    def nextTuple; end
-
-    def ack(id); end
-
-    def fail(id); end
-
-    def run
-      Storm::Protocol.mode = 'spout'
-      open(*handshake)
-
-      begin
-        while true
-          msg = read_command
-          case msg['command']
-            when 'next'
-              nextTuple
-            when 'ack'
-              ack(msg['id'])
-            when 'fail'
-              fail(msg['id'])
-          end
-          sync
-        end
-      rescue Exception => e
-        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
-      end
-    end
-  end
-end

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/python/pom.xml b/storm-multilang/python/pom.xml
new file mode 100644
index 0000000..379c0bc
--- /dev/null
+++ b/storm-multilang/python/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-python</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-python</name>
+
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/python/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-multilang/python/src/main/resources/resources/storm.py b/storm-multilang/python/src/main/resources/resources/storm.py
new file mode 100755
index 0000000..642c393
--- /dev/null
+++ b/storm-multilang/python/src/main/resources/resources/storm.py
@@ -0,0 +1,260 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import os
+import traceback
+from collections import deque
+
+try:
+    import simplejson as json
+except ImportError:
+    import json
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
+
+#reads lines and reconstructs newlines appropriately
+def readMsg():
+    msg = ""
+    while True:
+        line = sys.stdin.readline()
+        if not line:
+            raise Exception('Read EOF from stdin')
+        if line[0:-1] == "end":
+            break
+        msg = msg + line
+    return json_decode(msg[0:-1])
+
+MODE = None
+ANCHOR_TUPLE = None
+
+#queue up commands we read while trying to read taskids
+pending_commands = deque()
+
+def readTaskIds():
+    if pending_taskids:
+        return pending_taskids.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is not list:
+            pending_commands.append(msg)
+            msg = readMsg()
+        return msg
+
+#queue up taskids we read while trying to read commands/tuples
+pending_taskids = deque()
+
+def readCommand():
+    if pending_commands:
+        return pending_commands.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is list:
+            pending_taskids.append(msg)
+            msg = readMsg()
+        return msg
+
+def readTuple():
+    cmd = readCommand()
+    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
+def sendMsgToParent(msg):
+    print json_encode(msg)
+    print "end"
+    sys.stdout.flush()
+
+def sync():
+    sendMsgToParent({'command':'sync'})
+
+def sendpid(heartbeatdir):
+    pid = os.getpid()
+    sendMsgToParent({'pid':pid})
+    open(heartbeatdir + "/" + str(pid), "w").close()
+
+def emit(*args, **kwargs):
+    __emit(*args, **kwargs)
+    return readTaskIds()
+
+def emitDirect(task, *args, **kwargs):
+    kwargs["directTask"] = task
+    __emit(*args, **kwargs)
+
+def __emit(*args, **kwargs):
+    global MODE
+    if MODE == Bolt:
+        emitBolt(*args, **kwargs)
+    elif MODE == Spout:
+        emitSpout(*args, **kwargs)
+
+def emitBolt(tup, stream=None, anchors = [], directTask=None):
+    global ANCHOR_TUPLE
+    if ANCHOR_TUPLE is not None:
+        anchors = [ANCHOR_TUPLE]
+    m = {"command": "emit"}
+    if stream is not None:
+        m["stream"] = stream
+    m["anchors"] = map(lambda a: a.id, anchors)
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def emitSpout(tup, stream=None, id=None, directTask=None):
+    m = {"command": "emit"}
+    if id is not None:
+        m["id"] = id
+    if stream is not None:
+        m["stream"] = stream
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def ack(tup):
+    sendMsgToParent({"command": "ack", "id": tup.id})
+
+def fail(tup):
+    sendMsgToParent({"command": "fail", "id": tup.id})
+
+def reportError(msg):
+    sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+    sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+    log(msg, 0)
+
+def logDebug(msg):
+    log(msg, 1)
+
+def logInfo(msg):
+    log(msg, 2)
+
+def logWarn(msg):
+    log(msg, 3)
+
+def logError(msg):
+    log(msg, 4)
+
+def rpcMetrics(name, params):
+    sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
+def initComponent():
+    setupInfo = readMsg()
+    sendpid(setupInfo['pidDir'])
+    return [setupInfo['conf'], setupInfo['context']]
+
+class Tuple(object):
+    def __init__(self, id, component, stream, task, values):
+        self.id = id
+        self.component = component
+        self.stream = stream
+        self.task = task
+        self.values = values
+
+    def __repr__(self):
+        return '<%s%s>' % (
+            self.__class__.__name__,
+            ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
+
+    def is_heartbeat_tuple(self):
+        return self.task == -1 and self.stream == "__heartbeat"
+
+class Bolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    self.process(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class BasicBolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        global ANCHOR_TUPLE
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                if tup.is_heartbeat_tuple():
+                    sync()
+                else:
+                    ANCHOR_TUPLE = tup
+                    try:
+                        self.process(tup)
+                        ack(tup)
+                    except Exception, e:
+                        reportError(traceback.format_exc(e))
+                        fail(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class Spout(object):
+    def initialize(self, conf, context):
+        pass
+
+    def ack(self, id):
+        pass
+
+    def fail(self, id):
+        pass
+
+    def nextTuple(self):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Spout
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                msg = readCommand()
+                if msg["command"] == "next":
+                    self.nextTuple()
+                if msg["command"] == "ack":
+                    self.ack(msg["id"])
+                if msg["command"] == "fail":
+                    self.fail(msg["id"])
+                sync()
+        except Exception, e:
+            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/ruby/pom.xml b/storm-multilang/ruby/pom.xml
new file mode 100644
index 0000000..6b5dd0c
--- /dev/null
+++ b/storm-multilang/ruby/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>0.11.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+    <groupId>org.apache.storm</groupId>
+    <artifactId>multilang-ruby</artifactId>
+    <packaging>jar</packaging>
+    <name>multilang-ruby</name>
+    
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/ruby/src/main/resources/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-multilang/ruby/src/main/resources/resources/storm.rb b/storm-multilang/ruby/src/main/resources/resources/storm.rb
new file mode 100644
index 0000000..816694e
--- /dev/null
+++ b/storm-multilang/ruby/src/main/resources/resources/storm.rb
@@ -0,0 +1,236 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require "rubygems"
+require "json"
+
+module Storm
+  module Protocol
+    class << self
+      attr_accessor :mode, :pending_taskids, :pending_commands
+    end
+
+    self.pending_taskids = []
+    self.pending_commands = []
+
+    def read_message
+      msg = ""
+      loop do
+        line = STDIN.readline.chomp
+        break if line == "end"
+        msg << line
+        msg << "\n"
+      end
+      JSON.parse msg.chomp
+    end
+
+    def read_task_ids
+      Storm::Protocol.pending_taskids.shift ||
+          begin
+            msg = read_message
+            until msg.is_a? Array
+              Storm::Protocol.pending_commands.push(msg)
+              msg = read_message
+            end
+            msg
+          end
+    end
+
+    def read_command
+      Storm::Protocol.pending_commands.shift ||
+          begin
+            msg = read_message
+            while msg.is_a? Array
+              Storm::Protocol.pending_taskids.push(msg)
+              msg = read_message
+            end
+            msg
+          end
+    end
+
+    def send_msg_to_parent(msg)
+      puts msg.to_json
+      puts "end"
+      STDOUT.flush
+    end
+
+    def sync
+      send_msg_to_parent({'command' => 'sync'})
+    end
+
+    def send_pid(heartbeat_dir)
+      pid = Process.pid
+      send_msg_to_parent({'pid' => pid})
+      File.open("#{heartbeat_dir}/#{pid}", "w").close
+    end
+
+    def emit_bolt(tup, args = {})
+      stream = args[:stream]
+      anchors = args[:anchors] || args[:anchor] || []
+      anchors = [anchors] unless anchors.is_a? Enumerable
+      direct = args[:direct_task]
+      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit_spout(tup, args = {})
+      stream = args[:stream]
+      id = args[:id]
+      direct = args[:direct_task]
+      m = {:command => :emit, :tuple => tup}
+      m[:id] = id if id
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit(*args)
+      case Storm::Protocol.mode
+        when 'spout'
+          emit_spout(*args)
+        when 'bolt'
+          emit_bolt(*args)
+      end
+    end
+
+    def ack(tup)
+      send_msg_to_parent :command => :ack, :id => tup.id
+    end
+
+    def fail(tup)
+      send_msg_to_parent :command => :fail, :id => tup.id
+    end
+
+    def reportError(msg)
+      send_msg_to_parent :command => :error, :msg => msg.to_s
+    end
+
+    def log(msg, level=2)
+      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+    end
+
+    def logTrace(msg)
+      log(msg, 0)
+    end
+
+    def logDebug(msg)
+      log(msg, 1)
+    end
+
+    def logInfo(msg)
+      log(msg, 2)
+    end
+
+    def logWarn(msg)
+      log(msg, 3)
+    end
+
+    def logError(msg)
+      log(msg, 4)
+    end
+
+    def handshake
+      setup_info = read_message
+      send_pid setup_info['pidDir']
+      [setup_info['conf'], setup_info['context']]
+    end
+  end
+
+  class Tuple
+    attr_accessor :id, :component, :stream, :task, :values
+
+    def initialize(id, component, stream, task, values)
+      @id = id
+      @component = component
+      @stream = stream
+      @task = task
+      @values = values
+    end
+
+    def self.from_hash(hash)
+      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+    end
+
+    def is_heartbeat
+      task == -1 and stream == '__heartbeat'
+    end
+  end
+
+  class Bolt
+    include Storm::Protocol
+
+    def prepare(conf, context); end
+
+    def process(tuple); end
+
+    def run
+      Storm::Protocol.mode = 'bolt'
+      prepare(*handshake)
+      begin
+        while true
+          tuple = Tuple.from_hash(read_command)
+          if tuple.is_heartbeat
+            sync
+          else
+            process tuple
+          end
+        end
+      rescue Exception => e
+        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+
+  class Spout
+    include Storm::Protocol
+
+    def open(conf, context); end
+
+    def nextTuple; end
+
+    def ack(id); end
+
+    def fail(id); end
+
+    def run
+      Storm::Protocol.mode = 'spout'
+      open(*handshake)
+
+      begin
+        while true
+          msg = read_command
+          case msg['command']
+            when 'next'
+              nextTuple
+            when 'ack'
+              ack(msg['id'])
+            when 'fail'
+              fail(msg['id'])
+          end
+          sync
+        end
+      rescue Exception => e
+        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+end


[25/27] storm git commit: add STORM-512 and STORM-786 to changelog

Posted by pt...@apache.org.
add STORM-512 and STORM-786 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e106ccb0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e106ccb0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e106ccb0

Branch: refs/heads/0.10.x-branch
Commit: e106ccb0c7d72b058774a05d8a6854e9c44d4495
Parents: 69c5499
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 15:54:33 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:54:33 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md                       | 2 ++
 storm-multilang/javascript/pom.xml | 2 +-
 storm-multilang/python/pom.xml     | 2 +-
 storm-multilang/ruby/pom.xml       | 2 +-
 4 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index da1395c..114dbaa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,6 @@
 ## 0.10.0
+ * STORM-786: KafkaBolt should ack tick tuples
+ * STORM-512: KafkaBolt doesn't handle ticks properly
  * STORM-788: Fix key for process latencies
  * STORM-748: Package Multi-Lang scripts so they don't have to be duplicated
  * STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.

http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/storm-multilang/javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/javascript/pom.xml b/storm-multilang/javascript/pom.xml
index e1cb993..36e6c64 100644
--- a/storm-multilang/javascript/pom.xml
+++ b/storm-multilang/javascript/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
+        <version>0.10.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/storm-multilang/python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/python/pom.xml b/storm-multilang/python/pom.xml
index 379c0bc..3c2f157 100644
--- a/storm-multilang/python/pom.xml
+++ b/storm-multilang/python/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
+        <version>0.10.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/storm-multilang/ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/ruby/pom.xml b/storm-multilang/ruby/pom.xml
index 6b5dd0c..54236b4 100644
--- a/storm-multilang/ruby/pom.xml
+++ b/storm-multilang/ruby/pom.xml
@@ -21,7 +21,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>0.11.0-SNAPSHOT</version>
+        <version>0.10.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
     <groupId>org.apache.storm</groupId>


[11/27] storm git commit: Fix key for process latencies

Posted by pt...@apache.org.
Fix key for process latencies


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/73b56d63
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/73b56d63
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/73b56d63

Branch: refs/heads/0.10.x-branch
Commit: 73b56d638224947af1544f3e1040a299408ac887
Parents: ce5447e
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Apr 16 15:01:22 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:19:07 2015 -0400

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/ui/core.clj | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/73b56d63/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 8ba92de..0411010 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -838,7 +838,7 @@
        "encodedComponent" (url-encode (.get_componentId s))
        "stream" (.get_streamId s)
        "executeLatency" (float-str (:execute-latencies stats))
-       "processLatency" (float-str (:execute-latencies stats))
+       "processLatency" (float-str (:process-latencies stats))
        "executed" (nil-to-zero (:executed stats))
        "acked" (nil-to-zero (:acked stats))
        "failed" (nil-to-zero (:failed stats))})))


[03/27] storm git commit: add STORM-563 to changelog

Posted by pt...@apache.org.
add STORM-563 to changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1247f6d8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1247f6d8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1247f6d8

Branch: refs/heads/0.10.x-branch
Commit: 1247f6d8007befff1840f9f824f3e5e4491555f6
Parents: 847fc1c
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 14:58:16 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:58:16 2015 -0400

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1247f6d8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6ba3b32..753fc3a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 0.10.0
+ * STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
  * STORM-615: Add REST API to upload topology.
  * STORM-807: quote args correctly in /bin/storm
  * STORM-686: Add worker-launcher to storm-dist.


[27/27] storm git commit: fix invalid import in storm-starter

Posted by pt...@apache.org.
fix invalid import in storm-starter


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1498ed06
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1498ed06
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1498ed06

Branch: refs/heads/0.10.x-branch
Commit: 1498ed062864aa059aeacd0f0438b3759eccd9e2
Parents: 790fcab
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 17:27:10 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 17:27:10 2015 -0400

----------------------------------------------------------------------
 .../src/jvm/storm/starter/bolt/RollingCountAggBolt.java          | 4 ----
 1 file changed, 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1498ed06/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
index e513b09..39b206c 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
@@ -26,13 +26,9 @@ import backtype.storm.tuple.Fields;
 import backtype.storm.tuple.Tuple;
 import backtype.storm.tuple.Values;
 import org.apache.log4j.Logger;
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Map.Entry;
 
 /**
  * This bolt aggregates counts from multiple upstream bolts.


[15/27] storm git commit: Use isTick in all relevant places to avoid code duplication.

Posted by pt...@apache.org.
Use isTick in all relevant places to avoid code duplication.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6537b369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6537b369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6537b369

Branch: refs/heads/0.10.x-branch
Commit: 6537b3691378835ad135799fdb5a2cedf7ab4649
Parents: 62385d7
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:53:17 2014 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:13 2015 -0400

----------------------------------------------------------------------
 storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/6537b369/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 4dfccc6..da4c1a5 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -299,7 +299,7 @@ public class TridentBoltExecutor implements IRichBolt {
     
     @Override
     public void execute(Tuple tuple) {
-        if(tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+        if(tuple.isTick()) {
             long now = System.currentTimeMillis();
             if(now - _lastRotate > _messageTimeoutMs) {
                 _batches.rotate();