You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:21 UTC
[01/27] storm git commit: STORM-563. Kafka Spout doesn't pick up from
the beginning of the queue unless forceFromStart specified.
Repository: storm
Updated Branches:
refs/heads/0.10.x-branch 208ddbeff -> 1498ed062
STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/28d40a40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/28d40a40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/28d40a40
Branch: refs/heads/0.10.x-branch
Commit: 28d40a40678426b352b6c3cd191630b5186f9f32
Parents: 208ddbe
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Mon Mar 30 16:11:22 2015 -0700
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:57:24 2015 -0400
----------------------------------------------------------------------
external/storm-kafka/README.md | 19 ++++++++++++++++++-
.../src/jvm/storm/kafka/KafkaConfig.java | 2 +-
.../src/jvm/storm/kafka/KafkaUtils.java | 5 +----
.../src/jvm/storm/kafka/PartitionManager.java | 4 ++--
.../storm/kafka/trident/TridentKafkaEmitter.java | 4 ++--
.../src/test/storm/kafka/KafkaUtilsTest.java | 6 +++---
6 files changed, 27 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index c5ed4a5..8a98c0c 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -83,7 +83,7 @@ The KafkaConfig class also has bunch of public variables that controls your appl
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
- public boolean forceFromStart = false;
+ public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
@@ -120,6 +120,23 @@ spoutConf.scheme = new SchemeAsMultiScheme(new StringScheme());
OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
```
+### How KafkaSpout stores offsets of a kafka topic and recovers incase of failures
+
+As shown in the above KafkaConfig properties , user can control where in the topic they can start reading by setting **KafkaConfig.startOffsetTime.**
+
+There are two options **kafka.api.OffsetRequest.EarliestTime()** which makes the KafkaSpout to read from the begining of the topic and
+**kafka.api.OffsetRequest.LatestTime()** which starts at the end of the topic (or any new messsages that are being written to the topic).
+
+When user first deploys a KakfaSpout based topology they can use one of the above two options. As the topology runs
+KafkaSpout keeps track of the offsets its reading and writes these offset information under **SpoutConfig.zkRoot+ "/" + SpoutConfig.id**
+Incase of failures it recovers from the last written offset from zookeeper.
+
+If users deployed a topology , later killed and re-deploying should make sure that **SpoutConfig.id** and **SpoutConfig.zkRoot**
+remains the same otherwise Kafkaspout won't be able to start from stored zookeeper offsets.
+
+Users can set **KafkaConfig.ignoreZkOffsets** to **true** to make KafkaSpout ignore any zookeeper based offsets
+and start from configured **KafkaConfig.startOffsetTime**.
+
## Using storm-kafka with different versions of Scala
Storm-kafka's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index 5c85983..dd71b5a 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@ -33,7 +33,7 @@ public class KafkaConfig implements Serializable {
public int fetchMaxWait = 10000;
public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
- public boolean forceFromStart = false;
+ public boolean ignoreZkOffsets = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
public long maxOffsetBehind = Long.MAX_VALUE;
public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..b018032 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -60,10 +60,7 @@ public class KafkaUtils {
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
- long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
- if ( config.forceFromStart ) {
- startOffsetTime = config.startOffsetTime;
- }
+ long startOffsetTime = config.startOffsetTime;
return getOffset(consumer, topic, partition, startOffsetTime);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 630c7f6..00ab981 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -91,9 +91,9 @@ public class PartitionManager {
if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
_committedTo = currentOffset;
LOG.info("No partition information found, using configuration to determine offset");
- } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
+ } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
- LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
+ LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
} else {
_committedTo = jsonOffset;
LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 1a9be43..61c79a5 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -100,7 +100,7 @@ public class TridentKafkaEmitter {
if (lastTopoMeta != null) {
lastInstanceId = (String) lastTopoMeta.get("id");
}
- if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) {
+ if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
} else {
offset = (Long) lastMeta.get("nextOffset");
@@ -157,7 +157,7 @@ public class TridentKafkaEmitter {
private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
LOG.info("re-emitting batch, attempt " + attempt);
String instanceId = (String) meta.get("instanceId");
- if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) {
+ if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
SimpleConsumer consumer = _connections.register(partition);
long offset = (Long) meta.get("offset");
long nextOffset = (Long) meta.get("nextOffset");
http://git-wip-us.apache.org/repos/asf/storm/blob/28d40a40/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
index 1f1bbbc..965eaea 100644
--- a/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/KafkaUtilsTest.java
@@ -110,17 +110,17 @@ public class KafkaUtilsTest {
@Test
public void getOffsetFromConfigAndDontForceFromStart() {
- config.forceFromStart = false;
+ config.ignoreZkOffsets = false;
config.startOffsetTime = OffsetRequest.EarliestTime();
createTopicAndSendMessage();
- long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.LatestTime());
+ long latestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
long offsetFromConfig = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, config);
assertThat(latestOffset, is(equalTo(offsetFromConfig)));
}
@Test
public void getOffsetFromConfigAndFroceFromStart() {
- config.forceFromStart = true;
+ config.ignoreZkOffsets = true;
config.startOffsetTime = OffsetRequest.EarliestTime();
createTopicAndSendMessage();
long earliestOffset = KafkaUtils.getOffset(simpleConsumer, config.topic, 0, OffsetRequest.EarliestTime());
[13/27] storm git commit: New method in the Tuple interface "boolean
isTuple()" for easier handling of TickTuples.
Posted by pt...@apache.org.
New method in the Tuple interface "boolean isTuple()" for easier handling of TickTuples.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/96f35cc4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/96f35cc4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/96f35cc4
Branch: refs/heads/0.10.x-branch
Commit: 96f35cc4b19f0b656982f2f4f7aa2dac85539034
Parents: cf4fdc7
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:50:48 2014 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:13 2015 -0400
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/tuple/Tuple.java | 9 +++++++--
storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 8 +++++++-
2 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/96f35cc4/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 34dc61a..7ea93b9 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -42,7 +42,7 @@ public interface Tuple extends ITuple{
* Gets the id of the component that created this tuple.
*/
public String getSourceComponent();
-
+
/**
* Gets the id of the task that created this tuple.
*/
@@ -52,7 +52,12 @@ public interface Tuple extends ITuple{
* Gets the id of the stream that this tuple was emitted to.
*/
public String getSourceStreamId();
-
+
+ /**
+ * Returns if this tuple is a tick tuple or not.
+ */
+ public boolean isTick();
+
/**
* Gets the message id that associated with this tuple.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/96f35cc4/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 818eff1..7ff2c8c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,6 +17,7 @@
*/
package backtype.storm.tuple;
+import backtype.storm.Constants;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.utils.IndifferentAccessMap;
@@ -212,7 +213,12 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public String getSourceStreamId() {
return streamId;
}
-
+
+ public boolean isTick() {
+ return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
+ this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+ }
+
public MessageId getMessageId() {
return id;
}
[06/27] storm git commit: update storm-starter to use multilang
components from storm distribution;
switch to maven shade plugin for uber jar creation
Posted by pt...@apache.org.
update storm-starter to use multilang components from storm distribution; switch to maven shade plugin for uber jar creation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3c701415
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3c701415
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3c701415
Branch: refs/heads/0.10.x-branch
Commit: 3c701415b3891cad6d93ff8147e635cad465300e
Parents: db49216
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:19:47 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400
----------------------------------------------------------------------
examples/storm-starter/README.markdown | 8 +-
.../storm-starter/multilang/resources/storm.js | 373 -------------------
.../storm-starter/multilang/resources/storm.py | 260 -------------
.../storm-starter/multilang/resources/storm.rb | 236 ------------
examples/storm-starter/pom.xml | 15 +
5 files changed, 19 insertions(+), 873 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-starter/README.markdown b/examples/storm-starter/README.markdown
index 8161a06..1cb6636 100644
--- a/examples/storm-starter/README.markdown
+++ b/examples/storm-starter/README.markdown
@@ -96,20 +96,20 @@ You can package a jar suitable for submitting to a Storm cluster with the comman
$ mvn package
This will package your code and all the non-Storm dependencies into a single "uberjar" (or "fat jar") at the path
-`target/storm-starter-{version}-jar-with-dependencies.jar`.
+`target/storm-starter-{version}.jar`.
Example filename of the uberjar:
- >>> target/storm-starter-0.9.3-incubating-SNAPSHOT-jar-with-dependencies.jar
+ >>> target/storm-starter-0.9.3-incubating-SNAPSHOT.jar
You can submit (run) a topology contained in this uberjar to Storm via the `storm` CLI tool:
# Example 1: Run the RollingTopWords in local mode (LocalCluster)
- $ storm jar storm-starter-*-jar-with-dependencies.jar storm.starter.RollingTopWords
+ $ storm jar storm-starter-*.jar storm.starter.RollingTopWords
# Example 2: Run the RollingTopWords in remote/cluster mode,
# under the name "production-topology"
- $ storm jar storm-starter-*-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
+ $ storm jar storm-starter-*.jar storm.starter.RollingTopWords production-topology remote
_Submitting a topology in local vs. remote mode:_
It depends on the actual code of a topology how you can or even must tell Storm whether to run the topology locally (in
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.js
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js
deleted file mode 100755
index 355c2d2..0000000
--- a/examples/storm-starter/multilang/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
- this.messagePart = "";
- this.taskIdsCallbacks = [];
- this.isFirstMessage = true;
- this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
- var str = JSON.stringify(msg);
- process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
- this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
- var pid = process.pid;
- fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
- this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
- this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
- var self = this;
- var callback = function() {
- self.sendPid(setupInfo['pidDir']);
- }
- this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
- var self = this;
- process.stdin.on('readable', function() {
- var chunk = process.stdin.read();
- var messages = self.handleNewChunk(chunk);
- messages.forEach(function(message) {
- self.handleNewMessage(message);
- })
-
- });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
- //invariant: this.messagePart has no separator otherwise we would have parsed it already
- var messages = [];
- if (chunk && chunk.length !== 0) {
- //"{}".split("\nend\n") ==> ['{}']
- //"\nend\n".split("\nend\n") ==> ['' , '']
- //"{}\nend\n".split("\nend\n") ==> ['{}', '']
- //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
- // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
- this.messagePart = this.messagePart + chunk;
- var newMessageParts = this.messagePart.split(this.separator);
- while (newMessageParts.length > 0) {
- var potentialMessage = newMessageParts.shift();
- var anotherMessageAhead = newMessageParts.length > 0;
- if (!anotherMessageAhead) {
- this.messagePart = potentialMessage;
- }
- else if (potentialMessage.length > 0) {
- messages.push(potentialMessage);
- }
- }
- }
- return messages;
-}
-
-Storm.prototype.isTaskIds = function(msg) {
- return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
- var parsedMsg = JSON.parse(msg);
-
- if (this.isFirstMessage) {
- this.initSetupInfo(parsedMsg);
- this.isFirstMessage = false;
- } else if (this.isTaskIds(parsedMsg)) {
- this.handleNewTaskId(parsedMsg);
- } else {
- this.handleNewCommand(parsedMsg);
- }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
- //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
- //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
- //take the first callback in the list and be sure it is the right one.
-
- var callback = this.taskIdsCallbacks.shift();
- if (callback) {
- callback(taskIds);
- } else {
- throw new Error('Something went wrong, we off the split of task id callbacks');
- }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
- //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
- //through the callback (will be called when the response arrives). The callback is stored in a list until the
- //corresponding task id list arrives.
- if (messageDetails.task) {
- throw new Error('Illegal input - task. To emit to specific task use emit direct!');
- }
-
- if (!onTaskIds) {
- throw new Error('You must pass a onTaskIds callback when using emit!')
- }
-
- this.taskIdsCallbacks.push(onTaskIds);
- this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
- if (!commandDetails.task) {
- throw new Error("Emit direct must receive task id!")
- }
- this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
- done();
-}
-
-Storm.prototype.run = function() {
- process.stdout.setEncoding('utf8');
- process.stdin.setEncoding('utf8');
- this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
- this.id = id;
- this.component = component;
- this.stream = stream;
- this.task = task;
- this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
- Storm.call(this);
- this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
- var self = this;
-
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- stream: commandDetails.stream,
- task: commandDetails.task,
- anchors: [commandDetails.anchorTupleId]
- };
-
- this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
- var self = this;
- var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
- if (tup.task === -1 && tup.stream === "__heartbeat") {
- self.sync();
- return;
- }
-
- var callback = function(err) {
- if (err) {
- self.fail(tup, err);
- return;
- }
- self.ack(tup);
- }
- this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
- this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
- this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
- Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
- var self = this;
- var callback = function() {
- self.sync();
- }
-
- if (command["command"] === "next") {
- this.nextTuple(callback);
- }
-
- if (command["command"] === "ack") {
- this.ack(command["id"], callback);
- }
-
- if (command["command"] === "fail") {
- this.fail(command["id"], callback);
- }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- id: commandDetails.id,
- stream: commandDetails.stream,
- task: commandDetails.task
- };
-
- this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.py
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.py b/examples/storm-starter/multilang/resources/storm.py
deleted file mode 100644
index 642c393..0000000
--- a/examples/storm-starter/multilang/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
- msg = ""
- while True:
- line = sys.stdin.readline()
- if not line:
- raise Exception('Read EOF from stdin')
- if line[0:-1] == "end":
- break
- msg = msg + line
- return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
- if pending_taskids:
- return pending_taskids.popleft()
- else:
- msg = readMsg()
- while type(msg) is not list:
- pending_commands.append(msg)
- msg = readMsg()
- return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
- if pending_commands:
- return pending_commands.popleft()
- else:
- msg = readMsg()
- while type(msg) is list:
- pending_taskids.append(msg)
- msg = readMsg()
- return msg
-
-def readTuple():
- cmd = readCommand()
- return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
- print json_encode(msg)
- print "end"
- sys.stdout.flush()
-
-def sync():
- sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
- pid = os.getpid()
- sendMsgToParent({'pid':pid})
- open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
- __emit(*args, **kwargs)
- return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
- kwargs["directTask"] = task
- __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
- global MODE
- if MODE == Bolt:
- emitBolt(*args, **kwargs)
- elif MODE == Spout:
- emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
- global ANCHOR_TUPLE
- if ANCHOR_TUPLE is not None:
- anchors = [ANCHOR_TUPLE]
- m = {"command": "emit"}
- if stream is not None:
- m["stream"] = stream
- m["anchors"] = map(lambda a: a.id, anchors)
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
- m = {"command": "emit"}
- if id is not None:
- m["id"] = id
- if stream is not None:
- m["stream"] = stream
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def ack(tup):
- sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
- sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
- sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
- sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
- log(msg, 0)
-
-def logDebug(msg):
- log(msg, 1)
-
-def logInfo(msg):
- log(msg, 2)
-
-def logWarn(msg):
- log(msg, 3)
-
-def logError(msg):
- log(msg, 4)
-
-def rpcMetrics(name, params):
- sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
- setupInfo = readMsg()
- sendpid(setupInfo['pidDir'])
- return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
- def __init__(self, id, component, stream, task, values):
- self.id = id
- self.component = component
- self.stream = stream
- self.task = task
- self.values = values
-
- def __repr__(self):
- return '<%s%s>' % (
- self.__class__.__name__,
- ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
- def is_heartbeat_tuple(self):
- return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- self.process(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- global ANCHOR_TUPLE
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- ANCHOR_TUPLE = tup
- try:
- self.process(tup)
- ack(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
- fail(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class Spout(object):
- def initialize(self, conf, context):
- pass
-
- def ack(self, id):
- pass
-
- def fail(self, id):
- pass
-
- def nextTuple(self):
- pass
-
- def run(self):
- global MODE
- MODE = Spout
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- msg = readCommand()
- if msg["command"] == "next":
- self.nextTuple()
- if msg["command"] == "ack":
- self.ack(msg["id"])
- if msg["command"] == "fail":
- self.fail(msg["id"])
- sync()
- except Exception, e:
- reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.rb
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.rb b/examples/storm-starter/multilang/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/examples/storm-starter/multilang/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
- module Protocol
- class << self
- attr_accessor :mode, :pending_taskids, :pending_commands
- end
-
- self.pending_taskids = []
- self.pending_commands = []
-
- def read_message
- msg = ""
- loop do
- line = STDIN.readline.chomp
- break if line == "end"
- msg << line
- msg << "\n"
- end
- JSON.parse msg.chomp
- end
-
- def read_task_ids
- Storm::Protocol.pending_taskids.shift ||
- begin
- msg = read_message
- until msg.is_a? Array
- Storm::Protocol.pending_commands.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def read_command
- Storm::Protocol.pending_commands.shift ||
- begin
- msg = read_message
- while msg.is_a? Array
- Storm::Protocol.pending_taskids.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def send_msg_to_parent(msg)
- puts msg.to_json
- puts "end"
- STDOUT.flush
- end
-
- def sync
- send_msg_to_parent({'command' => 'sync'})
- end
-
- def send_pid(heartbeat_dir)
- pid = Process.pid
- send_msg_to_parent({'pid' => pid})
- File.open("#{heartbeat_dir}/#{pid}", "w").close
- end
-
- def emit_bolt(tup, args = {})
- stream = args[:stream]
- anchors = args[:anchors] || args[:anchor] || []
- anchors = [anchors] unless anchors.is_a? Enumerable
- direct = args[:direct_task]
- m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit_spout(tup, args = {})
- stream = args[:stream]
- id = args[:id]
- direct = args[:direct_task]
- m = {:command => :emit, :tuple => tup}
- m[:id] = id if id
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit(*args)
- case Storm::Protocol.mode
- when 'spout'
- emit_spout(*args)
- when 'bolt'
- emit_bolt(*args)
- end
- end
-
- def ack(tup)
- send_msg_to_parent :command => :ack, :id => tup.id
- end
-
- def fail(tup)
- send_msg_to_parent :command => :fail, :id => tup.id
- end
-
- def reportError(msg)
- send_msg_to_parent :command => :error, :msg => msg.to_s
- end
-
- def log(msg, level=2)
- send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
- end
-
- def logTrace(msg)
- log(msg, 0)
- end
-
- def logDebug(msg)
- log(msg, 1)
- end
-
- def logInfo(msg)
- log(msg, 2)
- end
-
- def logWarn(msg)
- log(msg, 3)
- end
-
- def logError(msg)
- log(msg, 4)
- end
-
- def handshake
- setup_info = read_message
- send_pid setup_info['pidDir']
- [setup_info['conf'], setup_info['context']]
- end
- end
-
- class Tuple
- attr_accessor :id, :component, :stream, :task, :values
-
- def initialize(id, component, stream, task, values)
- @id = id
- @component = component
- @stream = stream
- @task = task
- @values = values
- end
-
- def self.from_hash(hash)
- Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
- end
-
- def is_heartbeat
- task == -1 and stream == '__heartbeat'
- end
- end
-
- class Bolt
- include Storm::Protocol
-
- def prepare(conf, context); end
-
- def process(tuple); end
-
- def run
- Storm::Protocol.mode = 'bolt'
- prepare(*handshake)
- begin
- while true
- tuple = Tuple.from_hash(read_command)
- if tuple.is_heartbeat
- sync
- else
- process tuple
- end
- end
- rescue Exception => e
- reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-
- class Spout
- include Storm::Protocol
-
- def open(conf, context); end
-
- def nextTuple; end
-
- def ack(id); end
-
- def fail(id); end
-
- def run
- Storm::Protocol.mode = 'spout'
- open(*handshake)
-
- begin
- while true
- msg = read_command
- case msg['command']
- when 'next'
- nextTuple
- when 'ack'
- ack(msg['id'])
- when 'fail'
- fail(msg['id'])
- end
- sync
- end
- rescue Exception => e
- reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-end
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 5afe26e..8682a4b 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -72,6 +72,21 @@
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-javascript</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-ruby</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-python</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
[12/27] storm git commit: add STORM-788 to changelog
Posted by pt...@apache.org.
add STORM-788 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cf4fdc77
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cf4fdc77
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cf4fdc77
Branch: refs/heads/0.10.x-branch
Commit: cf4fdc77b81ec2d144d841d85eef62e0d947e43e
Parents: 73b56d6
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 15:19:43 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:19:43 2015 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cf4fdc77/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index ec096e5..da1395c 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.10.0
+ * STORM-788: Fix key for process latencies
* STORM-748: Package Multi-Lang scripts so they don't have to be duplicated
* STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
* STORM-615: Add REST API to upload topology.
[14/27] storm git commit: KafkaBolt no longer tries to
map/process/send Tick Tuples to Kafka.
Posted by pt...@apache.org.
KafkaBolt no longer tries to map/process/send Tick Tuples to Kafka.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/62385d7b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/62385d7b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/62385d7b
Branch: refs/heads/0.10.x-branch
Commit: 62385d7b290ac3599546d73fbdd5880b1f53fc0d
Parents: 96f35cc
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:51:42 2014 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:13 2015 -0400
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 ++++
1 file changed, 4 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/62385d7b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index cf169dc..913843c 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -89,6 +89,10 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
+ if (input.isTick()) {
+ return; // Do not try to send ticks to Kafka
+ }
+
K key = null;
V message = null;
String topic = null;
[20/27] storm git commit: Fixed the tests in storm-starter that do
not use the actual TupleImpl but mock everything themselves
Posted by pt...@apache.org.
Fixed the tests in storm-starter that do not use the actual TupleImpl but mock everything themselves
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/027db673
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/027db673
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/027db673
Branch: refs/heads/0.10.x-branch
Commit: 027db6738068ba61ba385acae0fb77a10b803e8d
Parents: d3dcd1b
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:53:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/027db673/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index b253350..3180fd3 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,6 +19,7 @@ package storm.starter.tools;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.TupleImpl;
import static org.mockito.Mockito.*;
@@ -28,13 +29,22 @@ public final class MockTupleHelpers {
}
public static Tuple mockTickTuple() {
- return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
+ when(tuple.isTick()).thenReturn(true);
+ return tuple;
}
public static Tuple mockTuple(String componentId, String streamId) {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn(componentId);
when(tuple.getSourceStreamId()).thenReturn(streamId);
+ when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
return tuple;
}
+
+ private static boolean isTick(String componentId, String streamId) {
+ return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
+ streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
+ }
+
}
[26/27] storm git commit: Revert "STORM-651: Rename "ui" service to
"storm ui" and temp.txt should be in TEMP folder.Add jdk and jres bin and
libs dirs to lib path."
Posted by pt...@apache.org.
Revert "STORM-651: Rename "ui" service to "storm ui" and temp.txt should be in TEMP folder.Add jdk and jres bin and libs dirs to lib path."
This reverts commit db1182d753a6cc911a220d2891c8efd2f093d77c.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/790fcab8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/790fcab8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/790fcab8
Branch: refs/heads/0.10.x-branch
Commit: 790fcab8d0932e5f59a4e74d129e924015180f17
Parents: e106ccb
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 16:42:50 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 16:42:50 2015 -0400
----------------------------------------------------------------------
bin/storm-config.cmd | 14 +++++++-------
bin/storm.cmd | 33 +++++++++++----------------------
2 files changed, 18 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/790fcab8/bin/storm-config.cmd
----------------------------------------------------------------------
diff --git a/bin/storm-config.cmd b/bin/storm-config.cmd
index e185495..5203241 100644
--- a/bin/storm-config.cmd
+++ b/bin/storm-config.cmd
@@ -86,13 +86,13 @@ if not defined STORM_LOG_DIR (
@rem retrieve storm.logback.conf.dir from conf file
@rem
-"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value storm.logback.conf.dir > %CMD_TEMP_FILE%
+"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value storm.logback.conf.dir > temp.txt
-FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+FOR /F "delims=" %%i in (temp.txt) do (
FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
if %%a == VALUE: (
set STORM_LOGBACK_CONFIGURATION_DIR=%%b
- del /F %CMD_TEMP_FILE%)
+ del /F temp.txt)
)
)
)
@@ -113,9 +113,9 @@ if not defined STORM_LOGBACK_CONFIGURATION_FILE (
set STORM_LOGBACK_CONFIGURATION_FILE=%STORM_HOME%\logback\cluster.xml
)
-"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value java.library.path > %CMD_TEMP_FILE%
+"%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value java.library.path > temp.txt
-FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+FOR /F "delims=" %%i in (temp.txt) do (
FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
if %%a == VALUE: (
set JAVA_LIBRARY_PATH=%%b
@@ -125,10 +125,10 @@ FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
:storm_opts
- set STORM_OPTS=-Dstorm.options= -Dstorm.home=%STORM_HOME% -Djava.library.path=%JAVA_LIBRARY_PATH%;%JAVA_HOME%\bin;%JAVA_HOME%\lib;%JAVA_HOME%\jre\bin;%JAVA_HOME%\jre\lib
+ set STORM_OPTS=-Dstorm.options= -Dstorm.home=%STORM_HOME% -Djava.library.path=%JAVA_LIBRARY_PATH%
set STORM_OPTS=%STORM_OPTS% -Dlogback.configurationFile=%STORM_LOGBACK_CONFIGURATION_FILE%
set STORM_OPTS=%STORM_OPTS% -Dstorm.log.dir=%STORM_LOG_DIR%
- del /F %CMD_TEMP_FILE%
+ del /F temp.txt
if not defined STORM_SERVER_OPTS (
http://git-wip-us.apache.org/repos/asf/storm/blob/790fcab8/bin/storm.cmd
----------------------------------------------------------------------
diff --git a/bin/storm.cmd b/bin/storm.cmd
index ad1a81f..2f188cf 100644
--- a/bin/storm.cmd
+++ b/bin/storm.cmd
@@ -114,8 +114,8 @@
:drpc
set CLASS=backtype.storm.daemon.drpc
- "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value drpc.childopts > %CMD_TEMP_FILE%
- FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+ "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value drpc.childopts > temp.txt
+ FOR /F "delims=" %%i in (temp.txt) do (
FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
if %%a == VALUE: (
set CHILDOPTS=%%b
@@ -140,8 +140,8 @@
:logviewer
set CLASS=backtype.storm.daemon.logviewer
- "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value logviewer.childopts > %CMD_TEMP_FILE%
- FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+ "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value logviewer.childopts > temp.txt
+ FOR /F "delims=" %%i in (temp.txt) do (
FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
if %%a == VALUE: (
set CHILDOPTS=%%b
@@ -152,8 +152,8 @@
:nimbus
set CLASS=backtype.storm.daemon.nimbus
- "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value nimbus.childopts > %CMD_TEMP_FILE%
- FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+ "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value nimbus.childopts > temp.txt
+ FOR /F "delims=" %%i in (temp.txt) do (
FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
if %%a == VALUE: (
set CHILDOPTS=%%b
@@ -184,8 +184,8 @@
:supervisor
set CLASS=backtype.storm.daemon.supervisor
- "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value supervisor.childopts > %CMD_TEMP_FILE%
- FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+ "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value supervisor.childopts > temp.txt
+ FOR /F "delims=" %%i in (temp.txt) do (
FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
if %%a == VALUE: (
set CHILDOPTS=%%b
@@ -197,8 +197,8 @@
:ui
set CLASS=backtype.storm.ui.core
set CLASSPATH=%CLASSPATH%;%STORM_HOME%
- "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value ui.childopts > %CMD_TEMP_FILE%
- FOR /F "delims=" %%i in (%CMD_TEMP_FILE%) do (
+ "%JAVA%" -client -Dstorm.options= -Dstorm.conf.file= -cp "%CLASSPATH%" backtype.storm.command.config_value ui.childopts > temp.txt
+ FOR /F "delims=" %%i in (temp.txt) do (
FOR /F "tokens=1,* delims= " %%a in ("%%i") do (
if %%a == VALUE: (
set CHILDOPTS=%%b
@@ -212,17 +212,6 @@
set STORM_OPTS=%STORM_CLIENT_OPTS% %STORM_OPTS%
goto :eof
-:makeServiceXml
- set arguments=%*
- @echo ^<service^>
- @echo ^<id^>storm_%storm-command%^</id^>
- @echo ^<name^>storm_%storm-command%^</name^>
- @echo ^<description^>This service runs Storm %storm-command%^</description^>
- @echo ^<executable^>%JAVA%^</executable^>
- @echo ^<arguments^>%arguments%^</arguments^>
- @echo ^</service^>
- goto :eof
-
:make_command_arguments
if "%2" == "" goto :eof
set _count=0
@@ -242,7 +231,7 @@
:set_childopts
set STORM_OPTS=%STORM_SERVER_OPTS% %STORM_OPTS% %CHILDOPTS%
- del /F %CMD_TEMP_FILE%
+ del /F temp.txt
goto :eof
:print_usage
[02/27] storm git commit: STORM-563. Kafka Spout doesn't pick up from
the beginning of the queue unless forceFromStart specified.
Posted by pt...@apache.org.
STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/847fc1cf
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/847fc1cf
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/847fc1cf
Branch: refs/heads/0.10.x-branch
Commit: 847fc1cf6dcdc139f6f91a191a40a4dae1e97238
Parents: 28d40a4
Author: Sriharsha Chintalapani <ma...@harsha.io>
Authored: Tue Mar 31 20:21:29 2015 -0700
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:57:34 2015 -0400
----------------------------------------------------------------------
external/storm-kafka/README.md | 8 +++++---
1 file changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/847fc1cf/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 8a98c0c..a841880 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -124,10 +124,12 @@ OpaqueTridentKafkaSpout spout = new OpaqueTridentKafkaSpout(spoutConf);
As shown in the above KafkaConfig properties , user can control where in the topic they can start reading by setting **KafkaConfig.startOffsetTime.**
-There are two options **kafka.api.OffsetRequest.EarliestTime()** which makes the KafkaSpout to read from the begining of the topic and
-**kafka.api.OffsetRequest.LatestTime()** which starts at the end of the topic (or any new messsages that are being written to the topic).
+These are the options
+1. **kafka.api.OffsetRequest.EarliestTime() or -2 (value returned by EarliestTime())** which makes the KafkaSpout to read from the begining of the topic
+2. **kafka.api.OffsetRequest.LatestTime() or -1 (value returned by LatestTime())** which starts at the end of the topic ,any new messsages that are being written to the topic
+3. **System.time.currentTimeMillis()**
-When user first deploys a KakfaSpout based topology they can use one of the above two options. As the topology runs
+When user first deploys a KakfaSpout based topology they can use one of the above options. As the topology runs
KafkaSpout keeps track of the offsets its reading and writes these offset information under **SpoutConfig.zkRoot+ "/" + SpoutConfig.id**
Incase of failures it recovers from the last written offset from zookeeper.
[17/27] storm git commit: Replaced TupleHelpers in the examples with
the new tuple.isTick()
Posted by pt...@apache.org.
Replaced TupleHelpers in the examples with the new tuple.isTick()
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6b000407
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6b000407
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6b000407
Branch: refs/heads/0.10.x-branch
Commit: 6b0004070c80ae8f01e4bc38b759d57f482feeb2
Parents: 6537b36
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 29 15:50:18 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:14 2015 -0400
----------------------------------------------------------------------
.../storm/starter/bolt/AbstractRankerBolt.java | 3 +-
.../storm/starter/bolt/RollingCountBolt.java | 3 +-
.../jvm/storm/starter/util/TupleHelpers.java | 33 --------------------
3 files changed, 2 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6b000407/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index cc5c0e7..83c2cfc 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -26,7 +26,6 @@ import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.log4j.Logger;
import storm.starter.tools.Rankings;
-import storm.starter.util.TupleHelpers;
import java.util.HashMap;
import java.util.Map;
@@ -78,7 +77,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
*/
@Override
public final void execute(Tuple tuple, BasicOutputCollector collector) {
- if (TupleHelpers.isTickTuple(tuple)) {
+ if (tuple.isTick()) {
getLogger().debug("Received tick tuple, triggering emit of current rankings");
emitRankings(collector);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6b000407/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f83906c..f023c0b 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -28,7 +28,6 @@ import backtype.storm.tuple.Values;
import org.apache.log4j.Logger;
import storm.starter.tools.NthLastModifiedTimeTracker;
import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
import java.util.HashMap;
import java.util.Map;
@@ -95,7 +94,7 @@ public class RollingCountBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
- if (TupleHelpers.isTickTuple(tuple)) {
+ if (tuple.isTick()) {
LOG.debug("Received tick tuple, triggering emit of current window counts");
emitCurrentWindowCounts();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/6b000407/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java b/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
deleted file mode 100644
index 4ea669e..0000000
--- a/examples/storm-starter/src/jvm/storm/starter/util/TupleHelpers.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.starter.util;
-
-import backtype.storm.Constants;
-import backtype.storm.tuple.Tuple;
-
-public final class TupleHelpers {
-
- private TupleHelpers() {
- }
-
- public static boolean isTickTuple(Tuple tuple) {
- return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(
- Constants.SYSTEM_TICK_STREAM_ID);
- }
-
-}
[24/27] storm git commit: STORM-786: KafkaBolt should ack tick tuples
Posted by pt...@apache.org.
STORM-786: KafkaBolt should ack tick tuples
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/69c5499e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/69c5499e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/69c5499e
Branch: refs/heads/0.10.x-branch
Commit: 69c5499eddb3630fcfd04d06b1ddfb51f077295b
Parents: 9116770
Author: Michael G. Noll <mi...@michael-noll.com>
Authored: Thu Apr 16 11:44:40 2015 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:51:26 2015 -0400
----------------------------------------------------------------------
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
.../test/storm/kafka/bolt/KafkaBoltTest.java | 27 ++++++++++++++++++++
2 files changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index a8c4321..714ecd3 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -91,6 +91,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
if (TupleUtils.isTick(input)) {
+ collector.ack(input);
return; // Do not try to send ticks to Kafka
}
http://git-wip-us.apache.org/repos/asf/storm/blob/69c5499e/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
index c30cba1..576cc12 100644
--- a/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
+++ b/external/storm-kafka/src/test/storm/kafka/bolt/KafkaBoltTest.java
@@ -18,6 +18,7 @@
package storm.kafka.bolt;
import backtype.storm.Config;
+import backtype.storm.Constants;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.task.IOutputCollector;
import backtype.storm.task.OutputCollector;
@@ -26,6 +27,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.TupleImpl;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
import kafka.api.OffsetRequest;
import kafka.javaapi.consumer.SimpleConsumer;
@@ -45,7 +47,10 @@ import java.util.HashMap;
import java.util.Properties;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
public class KafkaBoltTest {
@@ -84,6 +89,18 @@ public class KafkaBoltTest {
}
@Test
+ public void shouldAcknowledgeTickTuples() throws Exception {
+ // Given
+ Tuple tickTuple = mockTickTuple();
+
+ // When
+ bolt.execute(tickTuple);
+
+ // Then
+ verify(collector).ack(tickTuple);
+ }
+
+ @Test
public void executeWithKey() throws Exception {
String message = "value-123";
String key = "key-123";
@@ -185,4 +202,14 @@ public class KafkaBoltTest {
};
return new TupleImpl(topologyContext, new Values(message), 1, "");
}
+
+ private Tuple mockTickTuple() {
+ Tuple tuple = mock(Tuple.class);
+ when(tuple.getSourceComponent()).thenReturn(Constants.SYSTEM_COMPONENT_ID);
+ when(tuple.getSourceStreamId()).thenReturn(Constants.SYSTEM_TICK_STREAM_ID);
+ // Sanity check
+ assertTrue(TupleUtils.isTick(tuple));
+ return tuple;
+ }
+
}
[21/27] storm git commit: Code cleanup in mocking storm-starter tests
Posted by pt...@apache.org.
Code cleanup in mocking storm-starter tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63f331fd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63f331fd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63f331fd
Branch: refs/heads/0.10.x-branch
Commit: 63f331fd54ba2d6684767e87e586222ce84f4c4f
Parents: 22a2923
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 14:00:33 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 5 ++---
1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/63f331fd/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 374288e..9e8629c 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -19,7 +19,6 @@ package storm.starter.tools;
import backtype.storm.Constants;
import backtype.storm.tuple.Tuple;
-import backtype.storm.tuple.TupleImpl;
import static org.mockito.Mockito.*;
@@ -39,9 +38,9 @@ public final class MockTupleHelpers {
when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
return tuple;
}
-
+
private static boolean isTick(String componentId, String streamId) {
- return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
+ return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
}
[07/27] storm git commit: remove redundant multilang scripts
Posted by pt...@apache.org.
remove redundant multilang scripts
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/db492167
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/db492167
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/db492167
Branch: refs/heads/0.10.x-branch
Commit: db492167a735f6bb08f1844d56e55808e0eca035
Parents: 9e2c816
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:10:27 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400
----------------------------------------------------------------------
examples/storm-starter/pom.xml | 52 ++---
storm-core/src/multilang/js/storm.js | 366 ------------------------------
storm-core/src/multilang/py/storm.py | 260 ---------------------
storm-core/src/multilang/rb/storm.rb | 236 -------------------
4 files changed, 25 insertions(+), 889 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index f59e692..5afe26e 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -93,33 +93,31 @@
</resources>
<plugins>
- <!--
- Bind the maven-assembly-plugin to the package phase
- this will create a jar file without the storm dependencies
- suitable for deployment to a cluster.
- -->
- <plugin>
- <artifactId>maven-assembly-plugin</artifactId>
- <configuration>
- <descriptorRefs>
- <descriptorRef>jar-with-dependencies</descriptorRef>
- </descriptorRefs>
- <archive>
- <manifest>
- <mainClass />
- </manifest>
- </archive>
- </configuration>
- <executions>
- <execution>
- <id>make-assembly</id>
- <phase>package</phase>
- <goals>
- <goal>single</goal>
- </goals>
- </execution>
- </executions>
- </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>1.4</version>
+ <configuration>
+ <createDependencyReducedPom>true</createDependencyReducedPom>
+ </configuration>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <transformers>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
+ <transformer
+ implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+ </transformer>
+ </transformers>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
<plugin>
<groupId>com.theoryinpractise</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/storm-core/src/multilang/js/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/js/storm.js b/storm-core/src/multilang/js/storm.js
deleted file mode 100755
index f5dcad2..0000000
--- a/storm-core/src/multilang/js/storm.js
+++ /dev/null
@@ -1,366 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
- /**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
- this.messagePart = "";
- this.taskIdsCallbacks = [];
- this.isFirstMessage = true;
- this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
- var str = JSON.stringify(msg);
- process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
- this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
- var pid = process.pid;
- fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
- this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
- this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
- var self = this;
- var callback = function() {
- self.sendPid(setupInfo['pidDir']);
- }
- this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
- var self = this;
- process.stdin.on('readable', function() {
- var chunk = process.stdin.read();
- var messages = self.handleNewChunk(chunk);
- messages.forEach(function(message) {
- self.handleNewMessage(message);
- })
-
- });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
- //invariant: this.messagePart has no separator otherwise we would have parsed it already
- var messages = [];
- if (chunk && chunk.length !== 0) {
- //"{}".split("\nend\n") ==> ['{}']
- //"\nend\n".split("\nend\n") ==> ['' , '']
- //"{}\nend\n".split("\nend\n") ==> ['{}', '']
- //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
- // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
- this.messagePart = this.messagePart + chunk;
- var newMessageParts = this.messagePart.split(this.separator);
- while (newMessageParts.length > 0) {
- var potentialMessage = newMessageParts.shift();
- var anotherMessageAhead = newMessageParts.length > 0;
- if (!anotherMessageAhead) {
- this.messagePart = potentialMessage;
- }
- else if (potentialMessage.length > 0) {
- messages.push(potentialMessage);
- }
- }
- }
- return messages;
- }
-
-Storm.prototype.isTaskIds = function(msg) {
- return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
- var parsedMsg = JSON.parse(msg);
-
- if (this.isFirstMessage) {
- this.initSetupInfo(parsedMsg);
- this.isFirstMessage = false;
- } else if (this.isTaskIds(parsedMsg)) {
- this.handleNewTaskId(parsedMsg);
- } else {
- this.handleNewCommand(parsedMsg);
- }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
- //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
- //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
- //take the first callback in the list and be sure it is the right one.
-
- var callback = this.taskIdsCallbacks.shift();
- if (callback) {
- callback(taskIds);
- } else {
- throw new Error('Something went wrong, we off the split of task id callbacks');
- }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
- //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
- //through the callback (will be called when the response arrives). The callback is stored in a list until the
- //corresponding task id list arrives.
- if (messageDetails.task) {
- throw new Error('Illegal input - task. To emit to specific task use emit direct!');
- }
-
- if (!onTaskIds) {
- throw new Error('You must pass a onTaskIds callback when using emit!')
- }
-
- this.taskIdsCallbacks.push(onTaskIds);
- this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
- if (!commandDetails.task) {
- throw new Error("Emit direct must receive task id!")
- }
- this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
- done();
-}
-
-Storm.prototype.run = function() {
- process.stdout.setEncoding('utf8');
- process.stdin.setEncoding('utf8');
- this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
- this.id = id;
- this.component = component;
- this.stream = stream;
- this.task = task;
- this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
- Storm.call(this);
- this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
- var self = this;
-
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- stream: commandDetails.stream,
- task: commandDetails.task,
- anchors: [commandDetails.anchorTupleId]
- };
-
- this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
- var self = this;
- var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
- var callback = function(err) {
- if (err) {
- self.fail(tup, err);
- return;
- }
- self.ack(tup);
- }
- this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
- this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
- this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
- Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
- var self = this;
- var callback = function() {
- self.sync();
- }
-
- if (command["command"] === "next") {
- this.nextTuple(callback);
- }
-
- if (command["command"] === "ack") {
- this.ack(command["id"], callback);
- }
-
- if (command["command"] === "fail") {
- this.fail(command["id"], callback);
- }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- id: commandDetails.id,
- stream: commandDetails.stream,
- task: commandDetails.task
- };
-
- this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/storm-core/src/multilang/py/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/py/storm.py b/storm-core/src/multilang/py/storm.py
deleted file mode 100755
index 642c393..0000000
--- a/storm-core/src/multilang/py/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
- msg = ""
- while True:
- line = sys.stdin.readline()
- if not line:
- raise Exception('Read EOF from stdin')
- if line[0:-1] == "end":
- break
- msg = msg + line
- return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
- if pending_taskids:
- return pending_taskids.popleft()
- else:
- msg = readMsg()
- while type(msg) is not list:
- pending_commands.append(msg)
- msg = readMsg()
- return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
- if pending_commands:
- return pending_commands.popleft()
- else:
- msg = readMsg()
- while type(msg) is list:
- pending_taskids.append(msg)
- msg = readMsg()
- return msg
-
-def readTuple():
- cmd = readCommand()
- return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
- print json_encode(msg)
- print "end"
- sys.stdout.flush()
-
-def sync():
- sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
- pid = os.getpid()
- sendMsgToParent({'pid':pid})
- open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
- __emit(*args, **kwargs)
- return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
- kwargs["directTask"] = task
- __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
- global MODE
- if MODE == Bolt:
- emitBolt(*args, **kwargs)
- elif MODE == Spout:
- emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
- global ANCHOR_TUPLE
- if ANCHOR_TUPLE is not None:
- anchors = [ANCHOR_TUPLE]
- m = {"command": "emit"}
- if stream is not None:
- m["stream"] = stream
- m["anchors"] = map(lambda a: a.id, anchors)
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
- m = {"command": "emit"}
- if id is not None:
- m["id"] = id
- if stream is not None:
- m["stream"] = stream
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def ack(tup):
- sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
- sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
- sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
- sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
- log(msg, 0)
-
-def logDebug(msg):
- log(msg, 1)
-
-def logInfo(msg):
- log(msg, 2)
-
-def logWarn(msg):
- log(msg, 3)
-
-def logError(msg):
- log(msg, 4)
-
-def rpcMetrics(name, params):
- sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
- setupInfo = readMsg()
- sendpid(setupInfo['pidDir'])
- return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
- def __init__(self, id, component, stream, task, values):
- self.id = id
- self.component = component
- self.stream = stream
- self.task = task
- self.values = values
-
- def __repr__(self):
- return '<%s%s>' % (
- self.__class__.__name__,
- ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
- def is_heartbeat_tuple(self):
- return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- self.process(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- global ANCHOR_TUPLE
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- ANCHOR_TUPLE = tup
- try:
- self.process(tup)
- ack(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
- fail(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class Spout(object):
- def initialize(self, conf, context):
- pass
-
- def ack(self, id):
- pass
-
- def fail(self, id):
- pass
-
- def nextTuple(self):
- pass
-
- def run(self):
- global MODE
- MODE = Spout
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- msg = readCommand()
- if msg["command"] == "next":
- self.nextTuple()
- if msg["command"] == "ack":
- self.ack(msg["id"])
- if msg["command"] == "fail":
- self.fail(msg["id"])
- sync()
- except Exception, e:
- reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/db492167/storm-core/src/multilang/rb/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/rb/storm.rb b/storm-core/src/multilang/rb/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/storm-core/src/multilang/rb/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
- module Protocol
- class << self
- attr_accessor :mode, :pending_taskids, :pending_commands
- end
-
- self.pending_taskids = []
- self.pending_commands = []
-
- def read_message
- msg = ""
- loop do
- line = STDIN.readline.chomp
- break if line == "end"
- msg << line
- msg << "\n"
- end
- JSON.parse msg.chomp
- end
-
- def read_task_ids
- Storm::Protocol.pending_taskids.shift ||
- begin
- msg = read_message
- until msg.is_a? Array
- Storm::Protocol.pending_commands.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def read_command
- Storm::Protocol.pending_commands.shift ||
- begin
- msg = read_message
- while msg.is_a? Array
- Storm::Protocol.pending_taskids.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def send_msg_to_parent(msg)
- puts msg.to_json
- puts "end"
- STDOUT.flush
- end
-
- def sync
- send_msg_to_parent({'command' => 'sync'})
- end
-
- def send_pid(heartbeat_dir)
- pid = Process.pid
- send_msg_to_parent({'pid' => pid})
- File.open("#{heartbeat_dir}/#{pid}", "w").close
- end
-
- def emit_bolt(tup, args = {})
- stream = args[:stream]
- anchors = args[:anchors] || args[:anchor] || []
- anchors = [anchors] unless anchors.is_a? Enumerable
- direct = args[:direct_task]
- m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit_spout(tup, args = {})
- stream = args[:stream]
- id = args[:id]
- direct = args[:direct_task]
- m = {:command => :emit, :tuple => tup}
- m[:id] = id if id
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit(*args)
- case Storm::Protocol.mode
- when 'spout'
- emit_spout(*args)
- when 'bolt'
- emit_bolt(*args)
- end
- end
-
- def ack(tup)
- send_msg_to_parent :command => :ack, :id => tup.id
- end
-
- def fail(tup)
- send_msg_to_parent :command => :fail, :id => tup.id
- end
-
- def reportError(msg)
- send_msg_to_parent :command => :error, :msg => msg.to_s
- end
-
- def log(msg, level=2)
- send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
- end
-
- def logTrace(msg)
- log(msg, 0)
- end
-
- def logDebug(msg)
- log(msg, 1)
- end
-
- def logInfo(msg)
- log(msg, 2)
- end
-
- def logWarn(msg)
- log(msg, 3)
- end
-
- def logError(msg)
- log(msg, 4)
- end
-
- def handshake
- setup_info = read_message
- send_pid setup_info['pidDir']
- [setup_info['conf'], setup_info['context']]
- end
- end
-
- class Tuple
- attr_accessor :id, :component, :stream, :task, :values
-
- def initialize(id, component, stream, task, values)
- @id = id
- @component = component
- @stream = stream
- @task = task
- @values = values
- end
-
- def self.from_hash(hash)
- Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
- end
-
- def is_heartbeat
- task == -1 and stream == '__heartbeat'
- end
- end
-
- class Bolt
- include Storm::Protocol
-
- def prepare(conf, context); end
-
- def process(tuple); end
-
- def run
- Storm::Protocol.mode = 'bolt'
- prepare(*handshake)
- begin
- while true
- tuple = Tuple.from_hash(read_command)
- if tuple.is_heartbeat
- sync
- else
- process tuple
- end
- end
- rescue Exception => e
- reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-
- class Spout
- include Storm::Protocol
-
- def open(conf, context); end
-
- def nextTuple; end
-
- def ack(id); end
-
- def fail(id); end
-
- def run
- Storm::Protocol.mode = 'spout'
- open(*handshake)
-
- begin
- while true
- msg = read_command
- case msg['command']
- when 'next'
- nextTuple
- when 'ack'
- ack(msg['id'])
- when 'fail'
- fail(msg['id'])
- end
- sync
- end
- rescue Exception => e
- reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-end
[16/27] storm git commit: Added missing ack for the tick
Posted by pt...@apache.org.
Added missing ack for the tick
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d3dcd1bd
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d3dcd1bd
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d3dcd1bd
Branch: refs/heads/0.10.x-branch
Commit: d3dcd1bdfc56fa7aca6752afdd5588586fc1ec91
Parents: 6b00040
Author: Niels Basjes <nb...@bol.com>
Authored: Thu Oct 30 10:27:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:14 2015 -0400
----------------------------------------------------------------------
external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d3dcd1bd/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 913843c..0a1e5fe 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -90,6 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
if (input.isTick()) {
+ collector.ack(input);
return; // Do not try to send ticks to Kafka
}
[10/27] storm git commit: add STORM-748 to changelog
Posted by pt...@apache.org.
add STORM-748 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ce5447e4
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ce5447e4
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ce5447e4
Branch: refs/heads/0.10.x-branch
Commit: ce5447e4dd3119fffac2335cab8e4368f178a236
Parents: 927132c
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 15:13:12 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:13:12 2015 -0400
----------------------------------------------------------------------
CHANGELOG.md | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ce5447e4/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 753fc3a..ec096e5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,11 +1,12 @@
## 0.10.0
+ * STORM-748: Package Multi-Lang scripts so they don't have to be duplicated
* STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
* STORM-615: Add REST API to upload topology.
* STORM-807: quote args correctly in /bin/storm
* STORM-686: Add worker-launcher to storm-dist.
* STORM-789: Send more topology context to Multi-Lang components via initial handshake
* STORM-764: Have option to compress thrift heartbeat
- * JIRA STORM-766 (Include version info in the service page)
+ * STORM-766 (Include version info in the service page)
* STORM-765: Thrift serialization for local state.
* STORM-762: uptime for worker heartbeats is lost when converted to thrift
* STORM-750: Set Config serialVersionUID
[09/27] storm git commit: sync storm.js with master
Posted by pt...@apache.org.
sync storm.js with master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/17c1368c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/17c1368c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/17c1368c
Branch: refs/heads/0.10.x-branch
Commit: 17c1368c8a61f037874916c709562dcbf2b94267
Parents: cb5afe3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:02:39 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400
----------------------------------------------------------------------
.../src/main/resources/resources/storm.js | 25 +++++++++++++-------
1 file changed, 16 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/17c1368c/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
index f5dcad2..dc6efc1 100755
--- a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
+++ b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
@@ -15,7 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
- /**
+
+/**
* Base classes in node-js for storm Bolt and Spout.
* Implements the storm multilang protocol for nodejs.
*/
@@ -97,7 +98,7 @@ Storm.prototype.handleNewChunk = function(chunk) {
}
}
return messages;
- }
+}
Storm.prototype.isTaskIds = function(msg) {
return (msg instanceof Array);
@@ -260,13 +261,19 @@ BasicBolt.prototype.__emit = function(commandDetails) {
BasicBolt.prototype.handleNewCommand = function(command) {
var self = this;
var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+
+ if (tup.task === -1 && tup.stream === "__heartbeat") {
+ self.sync();
+ return;
+ }
+
var callback = function(err) {
- if (err) {
- self.fail(tup, err);
- return;
- }
- self.ack(tup);
- }
+ if (err) {
+ self.fail(tup, err);
+ return;
+ }
+ self.ack(tup);
+ }
this.process(tup, callback);
}
@@ -363,4 +370,4 @@ Spout.prototype.__emit = function(commandDetails) {
}
module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
+module.exports.Spout = Spout;
\ No newline at end of file
[22/27] storm git commit: Reducing the differences
Posted by pt...@apache.org.
Reducing the differences
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/91167704
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/91167704
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/91167704
Branch: refs/heads/0.10.x-branch
Commit: 9116770478bcd5ee794375d8a525741298a8ff01
Parents: 0e49d91
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:58:49 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 1 -
storm-core/src/jvm/backtype/storm/tuple/Tuple.java | 4 ++--
storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 3 +--
.../src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +-
4 files changed, 4 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index eeaeeae..b253350 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -37,5 +37,4 @@ public final class MockTupleHelpers {
when(tuple.getSourceStreamId()).thenReturn(streamId);
return tuple;
}
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index a31b52b..34dc61a 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -42,7 +42,7 @@ public interface Tuple extends ITuple{
* Gets the id of the component that created this tuple.
*/
public String getSourceComponent();
-
+
/**
* Gets the id of the task that created this tuple.
*/
@@ -52,7 +52,7 @@ public interface Tuple extends ITuple{
* Gets the id of the stream that this tuple was emitted to.
*/
public String getSourceStreamId();
-
+
/**
* Gets the message id that associated with this tuple.
*/
http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7829327..818eff1 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -17,7 +17,6 @@
*/
package backtype.storm.tuple;
-import backtype.storm.Constants;
import backtype.storm.generated.GlobalStreamId;
import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.utils.IndifferentAccessMap;
@@ -213,7 +212,7 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
public String getSourceStreamId() {
return streamId;
}
-
+
public MessageId getMessageId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/91167704/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 41741a1..a23e555 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -300,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- if (TupleUtils.isTick(tuple)) {
+ if(TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
[05/27] storm git commit: repackage multi-lang resources
Posted by pt...@apache.org.
repackage multi-lang resources
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cb5afe32
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cb5afe32
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cb5afe32
Branch: refs/heads/0.10.x-branch
Commit: cb5afe3262b3cc793cc4d3a9dd2ea7b5b89e37cd
Parents: 1247f6d
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 15:54:24 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400
----------------------------------------------------------------------
pom.xml | 3 +
storm-core/pom.xml | 35 ++
storm-core/src/dev/resources/storm.js | 373 -------------------
storm-core/src/dev/resources/storm.py | 260 -------------
storm-core/src/dev/resources/storm.rb | 236 ------------
storm-multilang/multilang-javascript/pom.xml | 32 ++
.../src/main/resources/resources/storm.js | 366 ++++++++++++++++++
storm-multilang/multilang-python/pom.xml | 32 ++
.../src/main/resources/resources/storm.py | 260 +++++++++++++
storm-multilang/multilang-ruby/pom.xml | 32 ++
.../src/main/resources/resources/storm.rb | 236 ++++++++++++
11 files changed, 996 insertions(+), 869 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 236e20f..4d7fbd2 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,6 +157,9 @@
<modules>
<module>storm-buildtools/maven-shade-clojure-transformer</module>
<module>storm-buildtools/storm-maven-plugins</module>
+ <module>storm-multilang/multilang-javascript</module>
+ <module>storm-multilang/multilang-python</module>
+ <module>storm-multilang/multilang-ruby</module>
<module>storm-core</module>
<module>examples/storm-starter</module>
<module>external/storm-kafka</module>
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/pom.xml
----------------------------------------------------------------------
diff --git a/storm-core/pom.xml b/storm-core/pom.xml
index 17e1a15..db54481 100644
--- a/storm-core/pom.xml
+++ b/storm-core/pom.xml
@@ -35,6 +35,12 @@
</properties>
<dependencies>
+ <!-- multi-lang resources -->
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-ruby</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!--clojure-->
<dependency>
<groupId>org.clojure</groupId>
@@ -510,6 +516,35 @@
<includeScope>runtime</includeScope>
</configuration>
</execution>
+ <!-- multi-lang resources -->
+ <execution>
+ <id>unpack</id>
+ <phase>process-test-resources</phase>
+ <goals>
+ <goal>unpack</goal>
+ </goals>
+ <configuration>
+ <artifactItems>
+ <artifactItem>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-ruby</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-python</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ <artifactItem>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-javascript</artifactId>
+ <version>${project.version}</version>
+ </artifactItem>
+ </artifactItems>
+ <outputDirectory>${project.build.directory}/test-classes</outputDirectory>
+ </configuration>
+ </execution>
+
</executions>
</plugin>
<plugin>
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
deleted file mode 100755
index 8827cd3..0000000
--- a/storm-core/src/dev/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
- this.messagePart = "";
- this.taskIdsCallbacks = [];
- this.isFirstMessage = true;
- this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
- var str = JSON.stringify(msg);
- process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
- this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
- var pid = process.pid;
- fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
- this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
- this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
- var self = this;
- var callback = function() {
- self.sendPid(setupInfo['pidDir']);
- }
- this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
- var self = this;
- process.stdin.on('readable', function() {
- var chunk = process.stdin.read();
- var messages = self.handleNewChunk(chunk);
- messages.forEach(function(message) {
- self.handleNewMessage(message);
- })
-
- });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
- //invariant: this.messagePart has no separator otherwise we would have parsed it already
- var messages = [];
- if (chunk && chunk.length !== 0) {
- //"{}".split("\nend\n") ==> ['{}']
- //"\nend\n".split("\nend\n") ==> ['' , '']
- //"{}\nend\n".split("\nend\n") ==> ['{}', '']
- //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
- // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
- this.messagePart = this.messagePart + chunk;
- var newMessageParts = this.messagePart.split(this.separator);
- while (newMessageParts.length > 0) {
- var potentialMessage = newMessageParts.shift();
- var anotherMessageAhead = newMessageParts.length > 0;
- if (!anotherMessageAhead) {
- this.messagePart = potentialMessage;
- }
- else if (potentialMessage.length > 0) {
- messages.push(potentialMessage);
- }
- }
- }
- return messages;
- }
-
-Storm.prototype.isTaskIds = function(msg) {
- return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
- var parsedMsg = JSON.parse(msg);
-
- if (this.isFirstMessage) {
- this.initSetupInfo(parsedMsg);
- this.isFirstMessage = false;
- } else if (this.isTaskIds(parsedMsg)) {
- this.handleNewTaskId(parsedMsg);
- } else {
- this.handleNewCommand(parsedMsg);
- }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
- //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
- //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
- //take the first callback in the list and be sure it is the right one.
-
- var callback = this.taskIdsCallbacks.shift();
- if (callback) {
- callback(taskIds);
- } else {
- throw new Error('Something went wrong, we off the split of task id callbacks');
- }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
- //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
- //through the callback (will be called when the response arrives). The callback is stored in a list until the
- //corresponding task id list arrives.
- if (messageDetails.task) {
- throw new Error('Illegal input - task. To emit to specific task use emit direct!');
- }
-
- if (!onTaskIds) {
- throw new Error('You must pass a onTaskIds callback when using emit!')
- }
-
- this.taskIdsCallbacks.push(onTaskIds);
- this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
- if (!commandDetails.task) {
- throw new Error("Emit direct must receive task id!")
- }
- this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
- done();
-}
-
-Storm.prototype.run = function() {
- process.stdout.setEncoding('utf8');
- process.stdin.setEncoding('utf8');
- this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
- this.id = id;
- this.component = component;
- this.stream = stream;
- this.task = task;
- this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
- Storm.call(this);
- this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
- var self = this;
-
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- stream: commandDetails.stream,
- task: commandDetails.task,
- anchors: [commandDetails.anchorTupleId]
- };
-
- this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
- var self = this;
- var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
- if (tup.task === -1 && tup.stream === "__heartbeat") {
- self.sync();
- return;
- }
-
- var callback = function(err) {
- if (err) {
- self.fail(tup, err);
- return;
- }
- self.ack(tup);
- }
- this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
- this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
- this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
- Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
- var self = this;
- var callback = function() {
- self.sync();
- }
-
- if (command["command"] === "next") {
- this.nextTuple(callback);
- }
-
- if (command["command"] === "ack") {
- this.ack(command["id"], callback);
- }
-
- if (command["command"] === "fail") {
- this.fail(command["id"], callback);
- }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- id: commandDetails.id,
- stream: commandDetails.stream,
- task: commandDetails.task
- };
-
- this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py
deleted file mode 100755
index 642c393..0000000
--- a/storm-core/src/dev/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
- msg = ""
- while True:
- line = sys.stdin.readline()
- if not line:
- raise Exception('Read EOF from stdin')
- if line[0:-1] == "end":
- break
- msg = msg + line
- return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
- if pending_taskids:
- return pending_taskids.popleft()
- else:
- msg = readMsg()
- while type(msg) is not list:
- pending_commands.append(msg)
- msg = readMsg()
- return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
- if pending_commands:
- return pending_commands.popleft()
- else:
- msg = readMsg()
- while type(msg) is list:
- pending_taskids.append(msg)
- msg = readMsg()
- return msg
-
-def readTuple():
- cmd = readCommand()
- return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
- print json_encode(msg)
- print "end"
- sys.stdout.flush()
-
-def sync():
- sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
- pid = os.getpid()
- sendMsgToParent({'pid':pid})
- open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
- __emit(*args, **kwargs)
- return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
- kwargs["directTask"] = task
- __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
- global MODE
- if MODE == Bolt:
- emitBolt(*args, **kwargs)
- elif MODE == Spout:
- emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
- global ANCHOR_TUPLE
- if ANCHOR_TUPLE is not None:
- anchors = [ANCHOR_TUPLE]
- m = {"command": "emit"}
- if stream is not None:
- m["stream"] = stream
- m["anchors"] = map(lambda a: a.id, anchors)
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
- m = {"command": "emit"}
- if id is not None:
- m["id"] = id
- if stream is not None:
- m["stream"] = stream
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def ack(tup):
- sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
- sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
- sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
- sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
- log(msg, 0)
-
-def logDebug(msg):
- log(msg, 1)
-
-def logInfo(msg):
- log(msg, 2)
-
-def logWarn(msg):
- log(msg, 3)
-
-def logError(msg):
- log(msg, 4)
-
-def rpcMetrics(name, params):
- sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
- setupInfo = readMsg()
- sendpid(setupInfo['pidDir'])
- return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
- def __init__(self, id, component, stream, task, values):
- self.id = id
- self.component = component
- self.stream = stream
- self.task = task
- self.values = values
-
- def __repr__(self):
- return '<%s%s>' % (
- self.__class__.__name__,
- ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
- def is_heartbeat_tuple(self):
- return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- self.process(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- global ANCHOR_TUPLE
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- ANCHOR_TUPLE = tup
- try:
- self.process(tup)
- ack(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
- fail(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class Spout(object):
- def initialize(self, conf, context):
- pass
-
- def ack(self, id):
- pass
-
- def fail(self, id):
- pass
-
- def nextTuple(self):
- pass
-
- def run(self):
- global MODE
- MODE = Spout
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- msg = readCommand()
- if msg["command"] == "next":
- self.nextTuple()
- if msg["command"] == "ack":
- self.ack(msg["id"])
- if msg["command"] == "fail":
- self.fail(msg["id"])
- sync()
- except Exception, e:
- reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-core/src/dev/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.rb b/storm-core/src/dev/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/storm-core/src/dev/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
- module Protocol
- class << self
- attr_accessor :mode, :pending_taskids, :pending_commands
- end
-
- self.pending_taskids = []
- self.pending_commands = []
-
- def read_message
- msg = ""
- loop do
- line = STDIN.readline.chomp
- break if line == "end"
- msg << line
- msg << "\n"
- end
- JSON.parse msg.chomp
- end
-
- def read_task_ids
- Storm::Protocol.pending_taskids.shift ||
- begin
- msg = read_message
- until msg.is_a? Array
- Storm::Protocol.pending_commands.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def read_command
- Storm::Protocol.pending_commands.shift ||
- begin
- msg = read_message
- while msg.is_a? Array
- Storm::Protocol.pending_taskids.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def send_msg_to_parent(msg)
- puts msg.to_json
- puts "end"
- STDOUT.flush
- end
-
- def sync
- send_msg_to_parent({'command' => 'sync'})
- end
-
- def send_pid(heartbeat_dir)
- pid = Process.pid
- send_msg_to_parent({'pid' => pid})
- File.open("#{heartbeat_dir}/#{pid}", "w").close
- end
-
- def emit_bolt(tup, args = {})
- stream = args[:stream]
- anchors = args[:anchors] || args[:anchor] || []
- anchors = [anchors] unless anchors.is_a? Enumerable
- direct = args[:direct_task]
- m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit_spout(tup, args = {})
- stream = args[:stream]
- id = args[:id]
- direct = args[:direct_task]
- m = {:command => :emit, :tuple => tup}
- m[:id] = id if id
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit(*args)
- case Storm::Protocol.mode
- when 'spout'
- emit_spout(*args)
- when 'bolt'
- emit_bolt(*args)
- end
- end
-
- def ack(tup)
- send_msg_to_parent :command => :ack, :id => tup.id
- end
-
- def fail(tup)
- send_msg_to_parent :command => :fail, :id => tup.id
- end
-
- def reportError(msg)
- send_msg_to_parent :command => :error, :msg => msg.to_s
- end
-
- def log(msg, level=2)
- send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
- end
-
- def logTrace(msg)
- log(msg, 0)
- end
-
- def logDebug(msg)
- log(msg, 1)
- end
-
- def logInfo(msg)
- log(msg, 2)
- end
-
- def logWarn(msg)
- log(msg, 3)
- end
-
- def logError(msg)
- log(msg, 4)
- end
-
- def handshake
- setup_info = read_message
- send_pid setup_info['pidDir']
- [setup_info['conf'], setup_info['context']]
- end
- end
-
- class Tuple
- attr_accessor :id, :component, :stream, :task, :values
-
- def initialize(id, component, stream, task, values)
- @id = id
- @component = component
- @stream = stream
- @task = task
- @values = values
- end
-
- def self.from_hash(hash)
- Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
- end
-
- def is_heartbeat
- task == -1 and stream == '__heartbeat'
- end
- end
-
- class Bolt
- include Storm::Protocol
-
- def prepare(conf, context); end
-
- def process(tuple); end
-
- def run
- Storm::Protocol.mode = 'bolt'
- prepare(*handshake)
- begin
- while true
- tuple = Tuple.from_hash(read_command)
- if tuple.is_heartbeat
- sync
- else
- process tuple
- end
- end
- rescue Exception => e
- reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-
- class Spout
- include Storm::Protocol
-
- def open(conf, context); end
-
- def nextTuple; end
-
- def ack(id); end
-
- def fail(id); end
-
- def run
- Storm::Protocol.mode = 'spout'
- open(*handshake)
-
- begin
- while true
- msg = read_command
- case msg['command']
- when 'next'
- nextTuple
- when 'ack'
- ack(msg['id'])
- when 'fail'
- fail(msg['id'])
- end
- sync
- end
- rescue Exception => e
- reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-end
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/pom.xml b/storm-multilang/multilang-javascript/pom.xml
new file mode 100644
index 0000000..e1cb993
--- /dev/null
+++ b/storm-multilang/multilang-javascript/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-javascript</artifactId>
+ <packaging>jar</packaging>
+ <name>multilang-javascript</name>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
new file mode 100755
index 0000000..f5dcad2
--- /dev/null
+++ b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
@@ -0,0 +1,366 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ /**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+ this.messagePart = "";
+ this.taskIdsCallbacks = [];
+ this.isFirstMessage = true;
+ this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+ var str = JSON.stringify(msg);
+ process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+ this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+ var pid = process.pid;
+ fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+ this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+ this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+ var self = this;
+ var callback = function() {
+ self.sendPid(setupInfo['pidDir']);
+ }
+ this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+ var self = this;
+ process.stdin.on('readable', function() {
+ var chunk = process.stdin.read();
+ var messages = self.handleNewChunk(chunk);
+ messages.forEach(function(message) {
+ self.handleNewMessage(message);
+ })
+
+ });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messagePart
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+ //invariant: this.messagePart has no separator otherwise we would have parsed it already
+ var messages = [];
+ if (chunk && chunk.length !== 0) {
+ //"{}".split("\nend\n") ==> ['{}']
+ //"\nend\n".split("\nend\n") ==> ['' , '']
+ //"{}\nend\n".split("\nend\n") ==> ['{}', '']
+ //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
+ // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+ this.messagePart = this.messagePart + chunk;
+ var newMessageParts = this.messagePart.split(this.separator);
+ while (newMessageParts.length > 0) {
+ var potentialMessage = newMessageParts.shift();
+ var anotherMessageAhead = newMessageParts.length > 0;
+ if (!anotherMessageAhead) {
+ this.messagePart = potentialMessage;
+ }
+ else if (potentialMessage.length > 0) {
+ messages.push(potentialMessage);
+ }
+ }
+ }
+ return messages;
+ }
+
+Storm.prototype.isTaskIds = function(msg) {
+ return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+ var parsedMsg = JSON.parse(msg);
+
+ if (this.isFirstMessage) {
+ this.initSetupInfo(parsedMsg);
+ this.isFirstMessage = false;
+ } else if (this.isTaskIds(parsedMsg)) {
+ this.handleNewTaskId(parsedMsg);
+ } else {
+ this.handleNewCommand(parsedMsg);
+ }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+ //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
+ //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
+ //take the first callback in the list and be sure it is the right one.
+
+ var callback = this.taskIdsCallbacks.shift();
+ if (callback) {
+ callback(taskIds);
+ } else {
+ throw new Error('Something went wrong, we off the split of task id callbacks');
+ }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+ //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
+ //through the callback (will be called when the response arrives). The callback is stored in a list until the
+ //corresponding task id list arrives.
+ if (messageDetails.task) {
+ throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+ }
+
+ if (!onTaskIds) {
+ throw new Error('You must pass a onTaskIds callback when using emit!')
+ }
+
+ this.taskIdsCallbacks.push(onTaskIds);
+ this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+ if (!commandDetails.task) {
+ throw new Error("Emit direct must receive task id!")
+ }
+ this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+ done();
+}
+
+Storm.prototype.run = function() {
+ process.stdout.setEncoding('utf8');
+ process.stdin.setEncoding('utf8');
+ this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+ this.id = id;
+ this.component = component;
+ this.stream = stream;
+ this.task = task;
+ this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+ Storm.call(this);
+ this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+ var self = this;
+
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ stream: commandDetails.stream,
+ task: commandDetails.task,
+ anchors: [commandDetails.anchorTupleId]
+ };
+
+ this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+ var callback = function(err) {
+ if (err) {
+ self.fail(tup, err);
+ return;
+ }
+ self.ack(tup);
+ }
+ this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+ this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+ this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+ Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var callback = function() {
+ self.sync();
+ }
+
+ if (command["command"] === "next") {
+ this.nextTuple(callback);
+ }
+
+ if (command["command"] === "ack") {
+ this.ack(command["id"], callback);
+ }
+
+ if (command["command"] === "fail") {
+ this.fail(command["id"], callback);
+ }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ id: commandDetails.id,
+ stream: commandDetails.stream,
+ task: commandDetails.task
+ };
+
+ this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/pom.xml b/storm-multilang/multilang-python/pom.xml
new file mode 100644
index 0000000..379c0bc
--- /dev/null
+++ b/storm-multilang/multilang-python/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-python</artifactId>
+ <packaging>jar</packaging>
+ <name>multilang-python</name>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-python/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/src/main/resources/resources/storm.py b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
new file mode 100755
index 0000000..642c393
--- /dev/null
+++ b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
@@ -0,0 +1,260 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import os
+import traceback
+from collections import deque
+
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
+
+#reads lines and reconstructs newlines appropriately
+def readMsg():
+ msg = ""
+ while True:
+ line = sys.stdin.readline()
+ if not line:
+ raise Exception('Read EOF from stdin')
+ if line[0:-1] == "end":
+ break
+ msg = msg + line
+ return json_decode(msg[0:-1])
+
+MODE = None
+ANCHOR_TUPLE = None
+
+#queue up commands we read while trying to read taskids
+pending_commands = deque()
+
+def readTaskIds():
+ if pending_taskids:
+ return pending_taskids.popleft()
+ else:
+ msg = readMsg()
+ while type(msg) is not list:
+ pending_commands.append(msg)
+ msg = readMsg()
+ return msg
+
+#queue up taskids we read while trying to read commands/tuples
+pending_taskids = deque()
+
+def readCommand():
+ if pending_commands:
+ return pending_commands.popleft()
+ else:
+ msg = readMsg()
+ while type(msg) is list:
+ pending_taskids.append(msg)
+ msg = readMsg()
+ return msg
+
+def readTuple():
+ cmd = readCommand()
+ return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
+def sendMsgToParent(msg):
+ print json_encode(msg)
+ print "end"
+ sys.stdout.flush()
+
+def sync():
+ sendMsgToParent({'command':'sync'})
+
+def sendpid(heartbeatdir):
+ pid = os.getpid()
+ sendMsgToParent({'pid':pid})
+ open(heartbeatdir + "/" + str(pid), "w").close()
+
+def emit(*args, **kwargs):
+ __emit(*args, **kwargs)
+ return readTaskIds()
+
+def emitDirect(task, *args, **kwargs):
+ kwargs["directTask"] = task
+ __emit(*args, **kwargs)
+
+def __emit(*args, **kwargs):
+ global MODE
+ if MODE == Bolt:
+ emitBolt(*args, **kwargs)
+ elif MODE == Spout:
+ emitSpout(*args, **kwargs)
+
+def emitBolt(tup, stream=None, anchors = [], directTask=None):
+ global ANCHOR_TUPLE
+ if ANCHOR_TUPLE is not None:
+ anchors = [ANCHOR_TUPLE]
+ m = {"command": "emit"}
+ if stream is not None:
+ m["stream"] = stream
+ m["anchors"] = map(lambda a: a.id, anchors)
+ if directTask is not None:
+ m["task"] = directTask
+ m["tuple"] = tup
+ sendMsgToParent(m)
+
+def emitSpout(tup, stream=None, id=None, directTask=None):
+ m = {"command": "emit"}
+ if id is not None:
+ m["id"] = id
+ if stream is not None:
+ m["stream"] = stream
+ if directTask is not None:
+ m["task"] = directTask
+ m["tuple"] = tup
+ sendMsgToParent(m)
+
+def ack(tup):
+ sendMsgToParent({"command": "ack", "id": tup.id})
+
+def fail(tup):
+ sendMsgToParent({"command": "fail", "id": tup.id})
+
+def reportError(msg):
+ sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+ sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+ log(msg, 0)
+
+def logDebug(msg):
+ log(msg, 1)
+
+def logInfo(msg):
+ log(msg, 2)
+
+def logWarn(msg):
+ log(msg, 3)
+
+def logError(msg):
+ log(msg, 4)
+
+def rpcMetrics(name, params):
+ sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
+def initComponent():
+ setupInfo = readMsg()
+ sendpid(setupInfo['pidDir'])
+ return [setupInfo['conf'], setupInfo['context']]
+
+class Tuple(object):
+ def __init__(self, id, component, stream, task, values):
+ self.id = id
+ self.component = component
+ self.stream = stream
+ self.task = task
+ self.values = values
+
+ def __repr__(self):
+ return '<%s%s>' % (
+ self.__class__.__name__,
+ ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
+
+ def is_heartbeat_tuple(self):
+ return self.task == -1 and self.stream == "__heartbeat"
+
+class Bolt(object):
+ def initialize(self, stormconf, context):
+ pass
+
+ def process(self, tuple):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Bolt
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ tup = readTuple()
+ if tup.is_heartbeat_tuple():
+ sync()
+ else:
+ self.process(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+
+class BasicBolt(object):
+ def initialize(self, stormconf, context):
+ pass
+
+ def process(self, tuple):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Bolt
+ global ANCHOR_TUPLE
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ tup = readTuple()
+ if tup.is_heartbeat_tuple():
+ sync()
+ else:
+ ANCHOR_TUPLE = tup
+ try:
+ self.process(tup)
+ ack(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+ fail(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+
+class Spout(object):
+ def initialize(self, conf, context):
+ pass
+
+ def ack(self, id):
+ pass
+
+ def fail(self, id):
+ pass
+
+ def nextTuple(self):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Spout
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ msg = readCommand()
+ if msg["command"] == "next":
+ self.nextTuple()
+ if msg["command"] == "ack":
+ self.ack(msg["id"])
+ if msg["command"] == "fail":
+ self.fail(msg["id"])
+ sync()
+ except Exception, e:
+ reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/pom.xml b/storm-multilang/multilang-ruby/pom.xml
new file mode 100644
index 0000000..6b5dd0c
--- /dev/null
+++ b/storm-multilang/multilang-ruby/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-ruby</artifactId>
+ <packaging>jar</packaging>
+ <name>multilang-ruby</name>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/cb5afe32/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
new file mode 100644
index 0000000..816694e
--- /dev/null
+++ b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
@@ -0,0 +1,236 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require "rubygems"
+require "json"
+
+module Storm
+ module Protocol
+ class << self
+ attr_accessor :mode, :pending_taskids, :pending_commands
+ end
+
+ self.pending_taskids = []
+ self.pending_commands = []
+
+ def read_message
+ msg = ""
+ loop do
+ line = STDIN.readline.chomp
+ break if line == "end"
+ msg << line
+ msg << "\n"
+ end
+ JSON.parse msg.chomp
+ end
+
+ def read_task_ids
+ Storm::Protocol.pending_taskids.shift ||
+ begin
+ msg = read_message
+ until msg.is_a? Array
+ Storm::Protocol.pending_commands.push(msg)
+ msg = read_message
+ end
+ msg
+ end
+ end
+
+ def read_command
+ Storm::Protocol.pending_commands.shift ||
+ begin
+ msg = read_message
+ while msg.is_a? Array
+ Storm::Protocol.pending_taskids.push(msg)
+ msg = read_message
+ end
+ msg
+ end
+ end
+
+ def send_msg_to_parent(msg)
+ puts msg.to_json
+ puts "end"
+ STDOUT.flush
+ end
+
+ def sync
+ send_msg_to_parent({'command' => 'sync'})
+ end
+
+ def send_pid(heartbeat_dir)
+ pid = Process.pid
+ send_msg_to_parent({'pid' => pid})
+ File.open("#{heartbeat_dir}/#{pid}", "w").close
+ end
+
+ def emit_bolt(tup, args = {})
+ stream = args[:stream]
+ anchors = args[:anchors] || args[:anchor] || []
+ anchors = [anchors] unless anchors.is_a? Enumerable
+ direct = args[:direct_task]
+ m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+ m[:stream] = stream if stream
+ m[:task] = direct if direct
+ send_msg_to_parent m
+ read_task_ids unless direct
+ end
+
+ def emit_spout(tup, args = {})
+ stream = args[:stream]
+ id = args[:id]
+ direct = args[:direct_task]
+ m = {:command => :emit, :tuple => tup}
+ m[:id] = id if id
+ m[:stream] = stream if stream
+ m[:task] = direct if direct
+ send_msg_to_parent m
+ read_task_ids unless direct
+ end
+
+ def emit(*args)
+ case Storm::Protocol.mode
+ when 'spout'
+ emit_spout(*args)
+ when 'bolt'
+ emit_bolt(*args)
+ end
+ end
+
+ def ack(tup)
+ send_msg_to_parent :command => :ack, :id => tup.id
+ end
+
+ def fail(tup)
+ send_msg_to_parent :command => :fail, :id => tup.id
+ end
+
+ def reportError(msg)
+ send_msg_to_parent :command => :error, :msg => msg.to_s
+ end
+
+ def log(msg, level=2)
+ send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+ end
+
+ def logTrace(msg)
+ log(msg, 0)
+ end
+
+ def logDebug(msg)
+ log(msg, 1)
+ end
+
+ def logInfo(msg)
+ log(msg, 2)
+ end
+
+ def logWarn(msg)
+ log(msg, 3)
+ end
+
+ def logError(msg)
+ log(msg, 4)
+ end
+
+ def handshake
+ setup_info = read_message
+ send_pid setup_info['pidDir']
+ [setup_info['conf'], setup_info['context']]
+ end
+ end
+
+ class Tuple
+ attr_accessor :id, :component, :stream, :task, :values
+
+ def initialize(id, component, stream, task, values)
+ @id = id
+ @component = component
+ @stream = stream
+ @task = task
+ @values = values
+ end
+
+ def self.from_hash(hash)
+ Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+ end
+
+ def is_heartbeat
+ task == -1 and stream == '__heartbeat'
+ end
+ end
+
+ class Bolt
+ include Storm::Protocol
+
+ def prepare(conf, context); end
+
+ def process(tuple); end
+
+ def run
+ Storm::Protocol.mode = 'bolt'
+ prepare(*handshake)
+ begin
+ while true
+ tuple = Tuple.from_hash(read_command)
+ if tuple.is_heartbeat
+ sync
+ else
+ process tuple
+ end
+ end
+ rescue Exception => e
+ reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+ end
+ end
+ end
+
+ class Spout
+ include Storm::Protocol
+
+ def open(conf, context); end
+
+ def nextTuple; end
+
+ def ack(id); end
+
+ def fail(id); end
+
+ def run
+ Storm::Protocol.mode = 'spout'
+ open(*handshake)
+
+ begin
+ while true
+ msg = read_command
+ case msg['command']
+ when 'next'
+ nextTuple
+ when 'ack'
+ ack(msg['id'])
+ when 'fail'
+ fail(msg['id'])
+ end
+ sync
+ end
+ rescue Exception => e
+ reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+ end
+ end
+ end
+end
[18/27] storm git commit: Resolve NPE that can occur if there is no
SourceComponent in a Tuple
Posted by pt...@apache.org.
Resolve NPE that can occur if there is no SourceComponent in a Tuple
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/5faa782a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/5faa782a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/5faa782a
Branch: refs/heads/0.10.x-branch
Commit: 5faa782ab2c8732579f1693d74c8d1e1722c3942
Parents: 63f331f
Author: Niels Basjes <ni...@basjes.nl>
Authored: Thu Dec 11 12:35:01 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/5faa782a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 7ff2c8c..40ad11c 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -215,8 +215,8 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
}
public boolean isTick() {
- return this.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) &&
- this.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);
+ return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
+ Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
}
public MessageId getMessageId() {
[23/27] storm git commit: Code cleanup in mocking storm-starter tests
Posted by pt...@apache.org.
Code cleanup in mocking storm-starter tests
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/22a29239
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/22a29239
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/22a29239
Branch: refs/heads/0.10.x-branch
Commit: 22a2923979a70b6384d72ab510c5d1bcd8de274f
Parents: 027db67
Author: Niels Basjes <ni...@basjes.nl>
Authored: Tue Dec 9 13:56:11 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400
----------------------------------------------------------------------
.../test/jvm/storm/starter/tools/MockTupleHelpers.java | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/22a29239/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 3180fd3..374288e 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -29,9 +29,7 @@ public final class MockTupleHelpers {
}
public static Tuple mockTickTuple() {
- Tuple tuple = mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
- when(tuple.isTick()).thenReturn(true);
- return tuple;
+ return mockTuple(Constants.SYSTEM_COMPONENT_ID, Constants.SYSTEM_TICK_STREAM_ID);
}
public static Tuple mockTuple(String componentId, String streamId) {
[19/27] storm git commit: Refactor to move the isTick to a utility
class
Posted by pt...@apache.org.
Refactor to move the isTick to a utility class
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0e49d91b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0e49d91b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0e49d91b
Branch: refs/heads/0.10.x-branch
Commit: 0e49d91be5b765eccf3e05fe9c44f53762f17ddf
Parents: 5faa782
Author: Niels Basjes <ni...@basjes.nl>
Authored: Mon Dec 15 10:56:05 2014 +0100
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:28:27 2015 -0400
----------------------------------------------------------------------
.../storm/starter/bolt/AbstractRankerBolt.java | 3 +-
.../storm/starter/bolt/RollingCountBolt.java | 3 +-
.../storm/starter/tools/MockTupleHelpers.java | 6 ----
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 4 +--
.../src/jvm/backtype/storm/tuple/Tuple.java | 5 ---
.../src/jvm/backtype/storm/tuple/TupleImpl.java | 5 ---
.../jvm/backtype/storm/utils/TupleUtils.java | 35 ++++++++++++++++++++
.../trident/topology/TridentBoltExecutor.java | 3 +-
8 files changed, 43 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
index 83c2cfc..64ceb29 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/AbstractRankerBolt.java
@@ -24,6 +24,7 @@ import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import org.apache.log4j.Logger;
import storm.starter.tools.Rankings;
@@ -77,7 +78,7 @@ public abstract class AbstractRankerBolt extends BaseBasicBolt {
*/
@Override
public final void execute(Tuple tuple, BasicOutputCollector collector) {
- if (tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
getLogger().debug("Received tick tuple, triggering emit of current rankings");
emitRankings(collector);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
index f023c0b..31f7ee2 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountBolt.java
@@ -25,6 +25,7 @@ import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
+import backtype.storm.utils.TupleUtils;
import org.apache.log4j.Logger;
import storm.starter.tools.NthLastModifiedTimeTracker;
import storm.starter.tools.SlidingWindowCounter;
@@ -94,7 +95,7 @@ public class RollingCountBolt extends BaseRichBolt {
@Override
public void execute(Tuple tuple) {
- if (tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
LOG.debug("Received tick tuple, triggering emit of current window counts");
emitCurrentWindowCounts();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
index 9e8629c..eeaeeae 100644
--- a/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
+++ b/examples/storm-starter/test/jvm/storm/starter/tools/MockTupleHelpers.java
@@ -35,13 +35,7 @@ public final class MockTupleHelpers {
Tuple tuple = mock(Tuple.class);
when(tuple.getSourceComponent()).thenReturn(componentId);
when(tuple.getSourceStreamId()).thenReturn(streamId);
- when(tuple.isTick()).thenReturn(isTick(componentId, streamId));
return tuple;
}
- private static boolean isTick(String componentId, String streamId) {
- return componentId.equals(Constants.SYSTEM_COMPONENT_ID) &&
- streamId.equals(Constants.SYSTEM_TICK_STREAM_ID);
- }
-
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 0a1e5fe..a8c4321 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@ -22,6 +22,7 @@ import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichBolt;
import backtype.storm.tuple.Tuple;
+import backtype.storm.utils.TupleUtils;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
@@ -89,8 +90,7 @@ public class KafkaBolt<K, V> extends BaseRichBolt {
@Override
public void execute(Tuple input) {
- if (input.isTick()) {
- collector.ack(input);
+ if (TupleUtils.isTick(input)) {
return; // Do not try to send ticks to Kafka
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
index 7ea93b9..a31b52b 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/Tuple.java
@@ -54,11 +54,6 @@ public interface Tuple extends ITuple{
public String getSourceStreamId();
/**
- * Returns if this tuple is a tick tuple or not.
- */
- public boolean isTick();
-
- /**
* Gets the message id that associated with this tuple.
*/
public MessageId getMessageId();
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
index 40ad11c..7829327 100644
--- a/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
+++ b/storm-core/src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -214,11 +214,6 @@ public class TupleImpl extends IndifferentAccessMap implements Seqable, Indexed,
return streamId;
}
- public boolean isTick() {
- return Constants.SYSTEM_COMPONENT_ID.equals(this.getSourceComponent()) &&
- Constants.SYSTEM_TICK_STREAM_ID.equals(this.getSourceStreamId());
- }
-
public MessageId getMessageId() {
return id;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
new file mode 100644
index 0000000..f9fb2c0
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/utils/TupleUtils.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.utils;
+
+import backtype.storm.Constants;
+import backtype.storm.tuple.Tuple;
+
+public final class TupleUtils {
+
+ private TupleUtils() {
+ // No instantiation
+ }
+
+ public static boolean isTick(Tuple tuple) {
+ return tuple != null
+ && Constants.SYSTEM_COMPONENT_ID .equals(tuple.getSourceComponent())
+ && Constants.SYSTEM_TICK_STREAM_ID.equals(tuple.getSourceStreamId());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/0e49d91b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index da4c1a5..41741a1 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -34,6 +34,7 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.RotatingMap;
+import backtype.storm.utils.TupleUtils;
import backtype.storm.utils.Utils;
import java.io.Serializable;
import java.util.Arrays;
@@ -299,7 +300,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- if(tuple.isTick()) {
+ if (TupleUtils.isTick(tuple)) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();
[08/27] storm git commit: add additional information to error message
when ShellBolt dies
Posted by pt...@apache.org.
add additional information to error message when ShellBolt dies
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9e2c8166
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9e2c8166
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9e2c8166
Branch: refs/heads/0.10.x-branch
Commit: 9e2c8166fa8f56c60b366044d23e62a3e206f009
Parents: 17c1368
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:03:15 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400
----------------------------------------------------------------------
storm-core/src/jvm/backtype/storm/task/ShellBolt.java | 5 ++++-
1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/9e2c8166/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
index 308ec67..eac8a90 100644
--- a/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
+++ b/storm-core/src/jvm/backtype/storm/task/ShellBolt.java
@@ -279,7 +279,10 @@ public class ShellBolt implements IBolt {
private void die(Throwable exception) {
String processInfo = _process.getProcessInfoString() + _process.getProcessTerminationInfoString();
_exception = new RuntimeException(processInfo, exception);
- LOG.error("Halting process: ShellBolt died.", exception);
+ String message = String.format("Halting process: ShellBolt died. Command: %s, ProcessInfo %s",
+ Arrays.toString(_command),
+ processInfo);
+ LOG.error(message, exception);
_collector.reportError(exception);
if (_running || (exception instanceof Error)) { //don't exit if not running, unless it is an Error
System.exit(11);
[04/27] storm git commit: rename multilang component modules
Posted by pt...@apache.org.
rename multilang component modules
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/927132ce
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/927132ce
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/927132ce
Branch: refs/heads/0.10.x-branch
Commit: 927132ceff2b8354d75909845c3f09be4c41f6d0
Parents: 3c70141
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:25:10 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400
----------------------------------------------------------------------
pom.xml | 6 +-
storm-multilang/javascript/pom.xml | 32 ++
.../src/main/resources/resources/storm.js | 373 +++++++++++++++++++
storm-multilang/multilang-javascript/pom.xml | 32 --
.../src/main/resources/resources/storm.js | 373 -------------------
storm-multilang/multilang-python/pom.xml | 32 --
.../src/main/resources/resources/storm.py | 260 -------------
storm-multilang/multilang-ruby/pom.xml | 32 --
.../src/main/resources/resources/storm.rb | 236 ------------
storm-multilang/python/pom.xml | 32 ++
.../src/main/resources/resources/storm.py | 260 +++++++++++++
storm-multilang/ruby/pom.xml | 32 ++
.../ruby/src/main/resources/resources/storm.rb | 236 ++++++++++++
13 files changed, 968 insertions(+), 968 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 4d7fbd2..c5a0b24 100644
--- a/pom.xml
+++ b/pom.xml
@@ -157,9 +157,9 @@
<modules>
<module>storm-buildtools/maven-shade-clojure-transformer</module>
<module>storm-buildtools/storm-maven-plugins</module>
- <module>storm-multilang/multilang-javascript</module>
- <module>storm-multilang/multilang-python</module>
- <module>storm-multilang/multilang-ruby</module>
+ <module>storm-multilang/javascript</module>
+ <module>storm-multilang/python</module>
+ <module>storm-multilang/ruby</module>
<module>storm-core</module>
<module>examples/storm-starter</module>
<module>external/storm-kafka</module>
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/javascript/pom.xml b/storm-multilang/javascript/pom.xml
new file mode 100644
index 0000000..e1cb993
--- /dev/null
+++ b/storm-multilang/javascript/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-javascript</artifactId>
+ <packaging>jar</packaging>
+ <name>multilang-javascript</name>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/javascript/src/main/resources/resources/storm.js b/storm-multilang/javascript/src/main/resources/resources/storm.js
new file mode 100755
index 0000000..dc6efc1
--- /dev/null
+++ b/storm-multilang/javascript/src/main/resources/resources/storm.js
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+ this.messagePart = "";
+ this.taskIdsCallbacks = [];
+ this.isFirstMessage = true;
+ this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+ var str = JSON.stringify(msg);
+ process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+ this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+ var pid = process.pid;
+ fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+ this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+ this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+ var self = this;
+ var callback = function() {
+ self.sendPid(setupInfo['pidDir']);
+ }
+ this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+ var self = this;
+ process.stdin.on('readable', function() {
+ var chunk = process.stdin.read();
+ var messages = self.handleNewChunk(chunk);
+ messages.forEach(function(message) {
+ self.handleNewMessage(message);
+ })
+
+ });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messagePart
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+ //invariant: this.messagePart has no separator otherwise we would have parsed it already
+ var messages = [];
+ if (chunk && chunk.length !== 0) {
+ //"{}".split("\nend\n") ==> ['{}']
+ //"\nend\n".split("\nend\n") ==> ['' , '']
+ //"{}\nend\n".split("\nend\n") ==> ['{}', '']
+ //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
+ // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+ this.messagePart = this.messagePart + chunk;
+ var newMessageParts = this.messagePart.split(this.separator);
+ while (newMessageParts.length > 0) {
+ var potentialMessage = newMessageParts.shift();
+ var anotherMessageAhead = newMessageParts.length > 0;
+ if (!anotherMessageAhead) {
+ this.messagePart = potentialMessage;
+ }
+ else if (potentialMessage.length > 0) {
+ messages.push(potentialMessage);
+ }
+ }
+ }
+ return messages;
+}
+
+Storm.prototype.isTaskIds = function(msg) {
+ return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+ var parsedMsg = JSON.parse(msg);
+
+ if (this.isFirstMessage) {
+ this.initSetupInfo(parsedMsg);
+ this.isFirstMessage = false;
+ } else if (this.isTaskIds(parsedMsg)) {
+ this.handleNewTaskId(parsedMsg);
+ } else {
+ this.handleNewCommand(parsedMsg);
+ }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+ //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
+ //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
+ //take the first callback in the list and be sure it is the right one.
+
+ var callback = this.taskIdsCallbacks.shift();
+ if (callback) {
+ callback(taskIds);
+ } else {
+ throw new Error('Something went wrong, we off the split of task id callbacks');
+ }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+ //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
+ //through the callback (will be called when the response arrives). The callback is stored in a list until the
+ //corresponding task id list arrives.
+ if (messageDetails.task) {
+ throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+ }
+
+ if (!onTaskIds) {
+ throw new Error('You must pass a onTaskIds callback when using emit!')
+ }
+
+ this.taskIdsCallbacks.push(onTaskIds);
+ this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+ if (!commandDetails.task) {
+ throw new Error("Emit direct must receive task id!")
+ }
+ this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+ done();
+}
+
+Storm.prototype.run = function() {
+ process.stdout.setEncoding('utf8');
+ process.stdin.setEncoding('utf8');
+ this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+ this.id = id;
+ this.component = component;
+ this.stream = stream;
+ this.task = task;
+ this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+ Storm.call(this);
+ this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+ var self = this;
+
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ stream: commandDetails.stream,
+ task: commandDetails.task,
+ anchors: [commandDetails.anchorTupleId]
+ };
+
+ this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+
+ if (tup.task === -1 && tup.stream === "__heartbeat") {
+ self.sync();
+ return;
+ }
+
+ var callback = function(err) {
+ if (err) {
+ self.fail(tup, err);
+ return;
+ }
+ self.ack(tup);
+ }
+ this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+ this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+ this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+ Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var callback = function() {
+ self.sync();
+ }
+
+ if (command["command"] === "next") {
+ this.nextTuple(callback);
+ }
+
+ if (command["command"] === "ack") {
+ this.ack(command["id"], callback);
+ }
+
+ if (command["command"] === "fail") {
+ this.fail(command["id"], callback);
+ }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ id: commandDetails.id,
+ stream: commandDetails.stream,
+ task: commandDetails.task
+ };
+
+ this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/pom.xml b/storm-multilang/multilang-javascript/pom.xml
deleted file mode 100644
index e1cb993..0000000
--- a/storm-multilang/multilang-javascript/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>0.11.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
- <groupId>org.apache.storm</groupId>
- <artifactId>multilang-javascript</artifactId>
- <packaging>jar</packaging>
- <name>multilang-javascript</name>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js b/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
deleted file mode 100755
index dc6efc1..0000000
--- a/storm-multilang/multilang-javascript/src/main/resources/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
- this.messagePart = "";
- this.taskIdsCallbacks = [];
- this.isFirstMessage = true;
- this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
- var str = JSON.stringify(msg);
- process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
- this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
- var pid = process.pid;
- fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
- this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
- this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
- var self = this;
- var callback = function() {
- self.sendPid(setupInfo['pidDir']);
- }
- this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
- var self = this;
- process.stdin.on('readable', function() {
- var chunk = process.stdin.read();
- var messages = self.handleNewChunk(chunk);
- messages.forEach(function(message) {
- self.handleNewMessage(message);
- })
-
- });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
- //invariant: this.messagePart has no separator otherwise we would have parsed it already
- var messages = [];
- if (chunk && chunk.length !== 0) {
- //"{}".split("\nend\n") ==> ['{}']
- //"\nend\n".split("\nend\n") ==> ['' , '']
- //"{}\nend\n".split("\nend\n") ==> ['{}', '']
- //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
- // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
- this.messagePart = this.messagePart + chunk;
- var newMessageParts = this.messagePart.split(this.separator);
- while (newMessageParts.length > 0) {
- var potentialMessage = newMessageParts.shift();
- var anotherMessageAhead = newMessageParts.length > 0;
- if (!anotherMessageAhead) {
- this.messagePart = potentialMessage;
- }
- else if (potentialMessage.length > 0) {
- messages.push(potentialMessage);
- }
- }
- }
- return messages;
-}
-
-Storm.prototype.isTaskIds = function(msg) {
- return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
- var parsedMsg = JSON.parse(msg);
-
- if (this.isFirstMessage) {
- this.initSetupInfo(parsedMsg);
- this.isFirstMessage = false;
- } else if (this.isTaskIds(parsedMsg)) {
- this.handleNewTaskId(parsedMsg);
- } else {
- this.handleNewCommand(parsedMsg);
- }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
- //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
- //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
- //take the first callback in the list and be sure it is the right one.
-
- var callback = this.taskIdsCallbacks.shift();
- if (callback) {
- callback(taskIds);
- } else {
- throw new Error('Something went wrong, we off the split of task id callbacks');
- }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
- //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
- //through the callback (will be called when the response arrives). The callback is stored in a list until the
- //corresponding task id list arrives.
- if (messageDetails.task) {
- throw new Error('Illegal input - task. To emit to specific task use emit direct!');
- }
-
- if (!onTaskIds) {
- throw new Error('You must pass a onTaskIds callback when using emit!')
- }
-
- this.taskIdsCallbacks.push(onTaskIds);
- this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
- if (!commandDetails.task) {
- throw new Error("Emit direct must receive task id!")
- }
- this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
- done();
-}
-
-Storm.prototype.run = function() {
- process.stdout.setEncoding('utf8');
- process.stdin.setEncoding('utf8');
- this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
- this.id = id;
- this.component = component;
- this.stream = stream;
- this.task = task;
- this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
- Storm.call(this);
- this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
- var self = this;
-
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- stream: commandDetails.stream,
- task: commandDetails.task,
- anchors: [commandDetails.anchorTupleId]
- };
-
- this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
- var self = this;
- var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
- if (tup.task === -1 && tup.stream === "__heartbeat") {
- self.sync();
- return;
- }
-
- var callback = function(err) {
- if (err) {
- self.fail(tup, err);
- return;
- }
- self.ack(tup);
- }
- this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
- this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
- this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
- Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
- var self = this;
- var callback = function() {
- self.sync();
- }
-
- if (command["command"] === "next") {
- this.nextTuple(callback);
- }
-
- if (command["command"] === "ack") {
- this.ack(command["id"], callback);
- }
-
- if (command["command"] === "fail") {
- this.fail(command["id"], callback);
- }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- id: commandDetails.id,
- stream: commandDetails.stream,
- task: commandDetails.task
- };
-
- this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/pom.xml b/storm-multilang/multilang-python/pom.xml
deleted file mode 100644
index 379c0bc..0000000
--- a/storm-multilang/multilang-python/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>0.11.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
- <groupId>org.apache.storm</groupId>
- <artifactId>multilang-python</artifactId>
- <packaging>jar</packaging>
- <name>multilang-python</name>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-python/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-python/src/main/resources/resources/storm.py b/storm-multilang/multilang-python/src/main/resources/resources/storm.py
deleted file mode 100755
index 642c393..0000000
--- a/storm-multilang/multilang-python/src/main/resources/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
- msg = ""
- while True:
- line = sys.stdin.readline()
- if not line:
- raise Exception('Read EOF from stdin')
- if line[0:-1] == "end":
- break
- msg = msg + line
- return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
- if pending_taskids:
- return pending_taskids.popleft()
- else:
- msg = readMsg()
- while type(msg) is not list:
- pending_commands.append(msg)
- msg = readMsg()
- return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
- if pending_commands:
- return pending_commands.popleft()
- else:
- msg = readMsg()
- while type(msg) is list:
- pending_taskids.append(msg)
- msg = readMsg()
- return msg
-
-def readTuple():
- cmd = readCommand()
- return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
- print json_encode(msg)
- print "end"
- sys.stdout.flush()
-
-def sync():
- sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
- pid = os.getpid()
- sendMsgToParent({'pid':pid})
- open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
- __emit(*args, **kwargs)
- return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
- kwargs["directTask"] = task
- __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
- global MODE
- if MODE == Bolt:
- emitBolt(*args, **kwargs)
- elif MODE == Spout:
- emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
- global ANCHOR_TUPLE
- if ANCHOR_TUPLE is not None:
- anchors = [ANCHOR_TUPLE]
- m = {"command": "emit"}
- if stream is not None:
- m["stream"] = stream
- m["anchors"] = map(lambda a: a.id, anchors)
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
- m = {"command": "emit"}
- if id is not None:
- m["id"] = id
- if stream is not None:
- m["stream"] = stream
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def ack(tup):
- sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
- sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
- sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
- sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
- log(msg, 0)
-
-def logDebug(msg):
- log(msg, 1)
-
-def logInfo(msg):
- log(msg, 2)
-
-def logWarn(msg):
- log(msg, 3)
-
-def logError(msg):
- log(msg, 4)
-
-def rpcMetrics(name, params):
- sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
- setupInfo = readMsg()
- sendpid(setupInfo['pidDir'])
- return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
- def __init__(self, id, component, stream, task, values):
- self.id = id
- self.component = component
- self.stream = stream
- self.task = task
- self.values = values
-
- def __repr__(self):
- return '<%s%s>' % (
- self.__class__.__name__,
- ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
- def is_heartbeat_tuple(self):
- return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- self.process(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- global ANCHOR_TUPLE
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- ANCHOR_TUPLE = tup
- try:
- self.process(tup)
- ack(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
- fail(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class Spout(object):
- def initialize(self, conf, context):
- pass
-
- def ack(self, id):
- pass
-
- def fail(self, id):
- pass
-
- def nextTuple(self):
- pass
-
- def run(self):
- global MODE
- MODE = Spout
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- msg = readCommand()
- if msg["command"] == "next":
- self.nextTuple()
- if msg["command"] == "ack":
- self.ack(msg["id"])
- if msg["command"] == "fail":
- self.fail(msg["id"])
- sync()
- except Exception, e:
- reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/pom.xml b/storm-multilang/multilang-ruby/pom.xml
deleted file mode 100644
index 6b5dd0c..0000000
--- a/storm-multilang/multilang-ruby/pom.xml
+++ /dev/null
@@ -1,32 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <artifactId>storm</artifactId>
- <groupId>org.apache.storm</groupId>
- <version>0.11.0-SNAPSHOT</version>
- <relativePath>../../pom.xml</relativePath>
- </parent>
- <groupId>org.apache.storm</groupId>
- <artifactId>multilang-ruby</artifactId>
- <packaging>jar</packaging>
- <name>multilang-ruby</name>
-
-</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb b/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/storm-multilang/multilang-ruby/src/main/resources/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
- module Protocol
- class << self
- attr_accessor :mode, :pending_taskids, :pending_commands
- end
-
- self.pending_taskids = []
- self.pending_commands = []
-
- def read_message
- msg = ""
- loop do
- line = STDIN.readline.chomp
- break if line == "end"
- msg << line
- msg << "\n"
- end
- JSON.parse msg.chomp
- end
-
- def read_task_ids
- Storm::Protocol.pending_taskids.shift ||
- begin
- msg = read_message
- until msg.is_a? Array
- Storm::Protocol.pending_commands.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def read_command
- Storm::Protocol.pending_commands.shift ||
- begin
- msg = read_message
- while msg.is_a? Array
- Storm::Protocol.pending_taskids.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def send_msg_to_parent(msg)
- puts msg.to_json
- puts "end"
- STDOUT.flush
- end
-
- def sync
- send_msg_to_parent({'command' => 'sync'})
- end
-
- def send_pid(heartbeat_dir)
- pid = Process.pid
- send_msg_to_parent({'pid' => pid})
- File.open("#{heartbeat_dir}/#{pid}", "w").close
- end
-
- def emit_bolt(tup, args = {})
- stream = args[:stream]
- anchors = args[:anchors] || args[:anchor] || []
- anchors = [anchors] unless anchors.is_a? Enumerable
- direct = args[:direct_task]
- m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit_spout(tup, args = {})
- stream = args[:stream]
- id = args[:id]
- direct = args[:direct_task]
- m = {:command => :emit, :tuple => tup}
- m[:id] = id if id
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit(*args)
- case Storm::Protocol.mode
- when 'spout'
- emit_spout(*args)
- when 'bolt'
- emit_bolt(*args)
- end
- end
-
- def ack(tup)
- send_msg_to_parent :command => :ack, :id => tup.id
- end
-
- def fail(tup)
- send_msg_to_parent :command => :fail, :id => tup.id
- end
-
- def reportError(msg)
- send_msg_to_parent :command => :error, :msg => msg.to_s
- end
-
- def log(msg, level=2)
- send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
- end
-
- def logTrace(msg)
- log(msg, 0)
- end
-
- def logDebug(msg)
- log(msg, 1)
- end
-
- def logInfo(msg)
- log(msg, 2)
- end
-
- def logWarn(msg)
- log(msg, 3)
- end
-
- def logError(msg)
- log(msg, 4)
- end
-
- def handshake
- setup_info = read_message
- send_pid setup_info['pidDir']
- [setup_info['conf'], setup_info['context']]
- end
- end
-
- class Tuple
- attr_accessor :id, :component, :stream, :task, :values
-
- def initialize(id, component, stream, task, values)
- @id = id
- @component = component
- @stream = stream
- @task = task
- @values = values
- end
-
- def self.from_hash(hash)
- Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
- end
-
- def is_heartbeat
- task == -1 and stream == '__heartbeat'
- end
- end
-
- class Bolt
- include Storm::Protocol
-
- def prepare(conf, context); end
-
- def process(tuple); end
-
- def run
- Storm::Protocol.mode = 'bolt'
- prepare(*handshake)
- begin
- while true
- tuple = Tuple.from_hash(read_command)
- if tuple.is_heartbeat
- sync
- else
- process tuple
- end
- end
- rescue Exception => e
- reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-
- class Spout
- include Storm::Protocol
-
- def open(conf, context); end
-
- def nextTuple; end
-
- def ack(id); end
-
- def fail(id); end
-
- def run
- Storm::Protocol.mode = 'spout'
- open(*handshake)
-
- begin
- while true
- msg = read_command
- case msg['command']
- when 'next'
- nextTuple
- when 'ack'
- ack(msg['id'])
- when 'fail'
- fail(msg['id'])
- end
- sync
- end
- rescue Exception => e
- reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-end
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/python/pom.xml b/storm-multilang/python/pom.xml
new file mode 100644
index 0000000..379c0bc
--- /dev/null
+++ b/storm-multilang/python/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-python</artifactId>
+ <packaging>jar</packaging>
+ <name>multilang-python</name>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/python/src/main/resources/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-multilang/python/src/main/resources/resources/storm.py b/storm-multilang/python/src/main/resources/resources/storm.py
new file mode 100755
index 0000000..642c393
--- /dev/null
+++ b/storm-multilang/python/src/main/resources/resources/storm.py
@@ -0,0 +1,260 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import os
+import traceback
+from collections import deque
+
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
+
+#reads lines and reconstructs newlines appropriately
+def readMsg():
+ msg = ""
+ while True:
+ line = sys.stdin.readline()
+ if not line:
+ raise Exception('Read EOF from stdin')
+ if line[0:-1] == "end":
+ break
+ msg = msg + line
+ return json_decode(msg[0:-1])
+
+MODE = None
+ANCHOR_TUPLE = None
+
+#queue up commands we read while trying to read taskids
+pending_commands = deque()
+
+def readTaskIds():
+ if pending_taskids:
+ return pending_taskids.popleft()
+ else:
+ msg = readMsg()
+ while type(msg) is not list:
+ pending_commands.append(msg)
+ msg = readMsg()
+ return msg
+
+#queue up taskids we read while trying to read commands/tuples
+pending_taskids = deque()
+
+def readCommand():
+ if pending_commands:
+ return pending_commands.popleft()
+ else:
+ msg = readMsg()
+ while type(msg) is list:
+ pending_taskids.append(msg)
+ msg = readMsg()
+ return msg
+
+def readTuple():
+ cmd = readCommand()
+ return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
+def sendMsgToParent(msg):
+ print json_encode(msg)
+ print "end"
+ sys.stdout.flush()
+
+def sync():
+ sendMsgToParent({'command':'sync'})
+
+def sendpid(heartbeatdir):
+ pid = os.getpid()
+ sendMsgToParent({'pid':pid})
+ open(heartbeatdir + "/" + str(pid), "w").close()
+
+def emit(*args, **kwargs):
+ __emit(*args, **kwargs)
+ return readTaskIds()
+
+def emitDirect(task, *args, **kwargs):
+ kwargs["directTask"] = task
+ __emit(*args, **kwargs)
+
+def __emit(*args, **kwargs):
+ global MODE
+ if MODE == Bolt:
+ emitBolt(*args, **kwargs)
+ elif MODE == Spout:
+ emitSpout(*args, **kwargs)
+
+def emitBolt(tup, stream=None, anchors = [], directTask=None):
+ global ANCHOR_TUPLE
+ if ANCHOR_TUPLE is not None:
+ anchors = [ANCHOR_TUPLE]
+ m = {"command": "emit"}
+ if stream is not None:
+ m["stream"] = stream
+ m["anchors"] = map(lambda a: a.id, anchors)
+ if directTask is not None:
+ m["task"] = directTask
+ m["tuple"] = tup
+ sendMsgToParent(m)
+
+def emitSpout(tup, stream=None, id=None, directTask=None):
+ m = {"command": "emit"}
+ if id is not None:
+ m["id"] = id
+ if stream is not None:
+ m["stream"] = stream
+ if directTask is not None:
+ m["task"] = directTask
+ m["tuple"] = tup
+ sendMsgToParent(m)
+
+def ack(tup):
+ sendMsgToParent({"command": "ack", "id": tup.id})
+
+def fail(tup):
+ sendMsgToParent({"command": "fail", "id": tup.id})
+
+def reportError(msg):
+ sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+ sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+ log(msg, 0)
+
+def logDebug(msg):
+ log(msg, 1)
+
+def logInfo(msg):
+ log(msg, 2)
+
+def logWarn(msg):
+ log(msg, 3)
+
+def logError(msg):
+ log(msg, 4)
+
+def rpcMetrics(name, params):
+ sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
+def initComponent():
+ setupInfo = readMsg()
+ sendpid(setupInfo['pidDir'])
+ return [setupInfo['conf'], setupInfo['context']]
+
+class Tuple(object):
+ def __init__(self, id, component, stream, task, values):
+ self.id = id
+ self.component = component
+ self.stream = stream
+ self.task = task
+ self.values = values
+
+ def __repr__(self):
+ return '<%s%s>' % (
+ self.__class__.__name__,
+ ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
+
+ def is_heartbeat_tuple(self):
+ return self.task == -1 and self.stream == "__heartbeat"
+
+class Bolt(object):
+ def initialize(self, stormconf, context):
+ pass
+
+ def process(self, tuple):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Bolt
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ tup = readTuple()
+ if tup.is_heartbeat_tuple():
+ sync()
+ else:
+ self.process(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+
+class BasicBolt(object):
+ def initialize(self, stormconf, context):
+ pass
+
+ def process(self, tuple):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Bolt
+ global ANCHOR_TUPLE
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ tup = readTuple()
+ if tup.is_heartbeat_tuple():
+ sync()
+ else:
+ ANCHOR_TUPLE = tup
+ try:
+ self.process(tup)
+ ack(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+ fail(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+
+class Spout(object):
+ def initialize(self, conf, context):
+ pass
+
+ def ack(self, id):
+ pass
+
+ def fail(self, id):
+ pass
+
+ def nextTuple(self):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Spout
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ msg = readCommand()
+ if msg["command"] == "next":
+ self.nextTuple()
+ if msg["command"] == "ack":
+ self.ack(msg["id"])
+ if msg["command"] == "fail":
+ self.fail(msg["id"])
+ sync()
+ except Exception, e:
+ reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/ruby/pom.xml b/storm-multilang/ruby/pom.xml
new file mode 100644
index 0000000..6b5dd0c
--- /dev/null
+++ b/storm-multilang/ruby/pom.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <artifactId>storm</artifactId>
+ <groupId>org.apache.storm</groupId>
+ <version>0.11.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-ruby</artifactId>
+ <packaging>jar</packaging>
+ <name>multilang-ruby</name>
+
+</project>
http://git-wip-us.apache.org/repos/asf/storm/blob/927132ce/storm-multilang/ruby/src/main/resources/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-multilang/ruby/src/main/resources/resources/storm.rb b/storm-multilang/ruby/src/main/resources/resources/storm.rb
new file mode 100644
index 0000000..816694e
--- /dev/null
+++ b/storm-multilang/ruby/src/main/resources/resources/storm.rb
@@ -0,0 +1,236 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require "rubygems"
+require "json"
+
+module Storm
+ module Protocol
+ class << self
+ attr_accessor :mode, :pending_taskids, :pending_commands
+ end
+
+ self.pending_taskids = []
+ self.pending_commands = []
+
+ def read_message
+ msg = ""
+ loop do
+ line = STDIN.readline.chomp
+ break if line == "end"
+ msg << line
+ msg << "\n"
+ end
+ JSON.parse msg.chomp
+ end
+
+ def read_task_ids
+ Storm::Protocol.pending_taskids.shift ||
+ begin
+ msg = read_message
+ until msg.is_a? Array
+ Storm::Protocol.pending_commands.push(msg)
+ msg = read_message
+ end
+ msg
+ end
+ end
+
+ def read_command
+ Storm::Protocol.pending_commands.shift ||
+ begin
+ msg = read_message
+ while msg.is_a? Array
+ Storm::Protocol.pending_taskids.push(msg)
+ msg = read_message
+ end
+ msg
+ end
+ end
+
+ def send_msg_to_parent(msg)
+ puts msg.to_json
+ puts "end"
+ STDOUT.flush
+ end
+
+ def sync
+ send_msg_to_parent({'command' => 'sync'})
+ end
+
+ def send_pid(heartbeat_dir)
+ pid = Process.pid
+ send_msg_to_parent({'pid' => pid})
+ File.open("#{heartbeat_dir}/#{pid}", "w").close
+ end
+
+ def emit_bolt(tup, args = {})
+ stream = args[:stream]
+ anchors = args[:anchors] || args[:anchor] || []
+ anchors = [anchors] unless anchors.is_a? Enumerable
+ direct = args[:direct_task]
+ m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+ m[:stream] = stream if stream
+ m[:task] = direct if direct
+ send_msg_to_parent m
+ read_task_ids unless direct
+ end
+
+ def emit_spout(tup, args = {})
+ stream = args[:stream]
+ id = args[:id]
+ direct = args[:direct_task]
+ m = {:command => :emit, :tuple => tup}
+ m[:id] = id if id
+ m[:stream] = stream if stream
+ m[:task] = direct if direct
+ send_msg_to_parent m
+ read_task_ids unless direct
+ end
+
+ def emit(*args)
+ case Storm::Protocol.mode
+ when 'spout'
+ emit_spout(*args)
+ when 'bolt'
+ emit_bolt(*args)
+ end
+ end
+
+ def ack(tup)
+ send_msg_to_parent :command => :ack, :id => tup.id
+ end
+
+ def fail(tup)
+ send_msg_to_parent :command => :fail, :id => tup.id
+ end
+
+ def reportError(msg)
+ send_msg_to_parent :command => :error, :msg => msg.to_s
+ end
+
+ def log(msg, level=2)
+ send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+ end
+
+ def logTrace(msg)
+ log(msg, 0)
+ end
+
+ def logDebug(msg)
+ log(msg, 1)
+ end
+
+ def logInfo(msg)
+ log(msg, 2)
+ end
+
+ def logWarn(msg)
+ log(msg, 3)
+ end
+
+ def logError(msg)
+ log(msg, 4)
+ end
+
+ def handshake
+ setup_info = read_message
+ send_pid setup_info['pidDir']
+ [setup_info['conf'], setup_info['context']]
+ end
+ end
+
+ class Tuple
+ attr_accessor :id, :component, :stream, :task, :values
+
+ def initialize(id, component, stream, task, values)
+ @id = id
+ @component = component
+ @stream = stream
+ @task = task
+ @values = values
+ end
+
+ def self.from_hash(hash)
+ Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+ end
+
+ def is_heartbeat
+ task == -1 and stream == '__heartbeat'
+ end
+ end
+
+ class Bolt
+ include Storm::Protocol
+
+ def prepare(conf, context); end
+
+ def process(tuple); end
+
+ def run
+ Storm::Protocol.mode = 'bolt'
+ prepare(*handshake)
+ begin
+ while true
+ tuple = Tuple.from_hash(read_command)
+ if tuple.is_heartbeat
+ sync
+ else
+ process tuple
+ end
+ end
+ rescue Exception => e
+ reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+ end
+ end
+ end
+
+ class Spout
+ include Storm::Protocol
+
+ def open(conf, context); end
+
+ def nextTuple; end
+
+ def ack(id); end
+
+ def fail(id); end
+
+ def run
+ Storm::Protocol.mode = 'spout'
+ open(*handshake)
+
+ begin
+ while true
+ msg = read_command
+ case msg['command']
+ when 'next'
+ nextTuple
+ when 'ack'
+ ack(msg['id'])
+ when 'fail'
+ fail(msg['id'])
+ end
+ sync
+ end
+ rescue Exception => e
+ reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+ end
+ end
+ end
+end
[25/27] storm git commit: add STORM-512 and STORM-786 to changelog
Posted by pt...@apache.org.
add STORM-512 and STORM-786 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e106ccb0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e106ccb0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e106ccb0
Branch: refs/heads/0.10.x-branch
Commit: e106ccb0c7d72b058774a05d8a6854e9c44d4495
Parents: 69c5499
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 15:54:33 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:54:33 2015 -0400
----------------------------------------------------------------------
CHANGELOG.md | 2 ++
storm-multilang/javascript/pom.xml | 2 +-
storm-multilang/python/pom.xml | 2 +-
storm-multilang/ruby/pom.xml | 2 +-
4 files changed, 5 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index da1395c..114dbaa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,6 @@
## 0.10.0
+ * STORM-786: KafkaBolt should ack tick tuples
+ * STORM-512: KafkaBolt doesn't handle ticks properly
* STORM-788: Fix key for process latencies
* STORM-748: Package Multi-Lang scripts so they don't have to be duplicated
* STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/storm-multilang/javascript/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/javascript/pom.xml b/storm-multilang/javascript/pom.xml
index e1cb993..36e6c64 100644
--- a/storm-multilang/javascript/pom.xml
+++ b/storm-multilang/javascript/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.11.0-SNAPSHOT</version>
+ <version>0.10.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/storm-multilang/python/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/python/pom.xml b/storm-multilang/python/pom.xml
index 379c0bc..3c2f157 100644
--- a/storm-multilang/python/pom.xml
+++ b/storm-multilang/python/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.11.0-SNAPSHOT</version>
+ <version>0.10.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
http://git-wip-us.apache.org/repos/asf/storm/blob/e106ccb0/storm-multilang/ruby/pom.xml
----------------------------------------------------------------------
diff --git a/storm-multilang/ruby/pom.xml b/storm-multilang/ruby/pom.xml
index 6b5dd0c..54236b4 100644
--- a/storm-multilang/ruby/pom.xml
+++ b/storm-multilang/ruby/pom.xml
@@ -21,7 +21,7 @@
<parent>
<artifactId>storm</artifactId>
<groupId>org.apache.storm</groupId>
- <version>0.11.0-SNAPSHOT</version>
+ <version>0.10.0-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<groupId>org.apache.storm</groupId>
[11/27] storm git commit: Fix key for process latencies
Posted by pt...@apache.org.
Fix key for process latencies
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/73b56d63
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/73b56d63
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/73b56d63
Branch: refs/heads/0.10.x-branch
Commit: 73b56d638224947af1544f3e1040a299408ac887
Parents: ce5447e
Author: Derek Dagit <de...@yahoo-inc.com>
Authored: Thu Apr 16 15:01:22 2015 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:19:07 2015 -0400
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/ui/core.clj | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/73b56d63/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 8ba92de..0411010 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -838,7 +838,7 @@
"encodedComponent" (url-encode (.get_componentId s))
"stream" (.get_streamId s)
"executeLatency" (float-str (:execute-latencies stats))
- "processLatency" (float-str (:execute-latencies stats))
+ "processLatency" (float-str (:process-latencies stats))
"executed" (nil-to-zero (:executed stats))
"acked" (nil-to-zero (:acked stats))
"failed" (nil-to-zero (:failed stats))})))
[03/27] storm git commit: add STORM-563 to changelog
Posted by pt...@apache.org.
add STORM-563 to changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1247f6d8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1247f6d8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1247f6d8
Branch: refs/heads/0.10.x-branch
Commit: 1247f6d8007befff1840f9f824f3e5e4491555f6
Parents: 847fc1c
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 14:58:16 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 14:58:16 2015 -0400
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1247f6d8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6ba3b32..753fc3a 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 0.10.0
+ * STORM-563. Kafka Spout doesn't pick up from the beginning of the queue unless forceFromStart specified.
* STORM-615: Add REST API to upload topology.
* STORM-807: quote args correctly in /bin/storm
* STORM-686: Add worker-launcher to storm-dist.
[27/27] storm git commit: fix invalid import in storm-starter
Posted by pt...@apache.org.
fix invalid import in storm-starter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1498ed06
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1498ed06
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1498ed06
Branch: refs/heads/0.10.x-branch
Commit: 1498ed062864aa059aeacd0f0438b3759eccd9e2
Parents: 790fcab
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri May 15 17:27:10 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 17:27:10 2015 -0400
----------------------------------------------------------------------
.../src/jvm/storm/starter/bolt/RollingCountAggBolt.java | 4 ----
1 file changed, 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1498ed06/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
index e513b09..39b206c 100644
--- a/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
+++ b/examples/storm-starter/src/jvm/storm/starter/bolt/RollingCountAggBolt.java
@@ -26,13 +26,9 @@ import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import org.apache.log4j.Logger;
-import storm.starter.tools.NthLastModifiedTimeTracker;
-import storm.starter.tools.SlidingWindowCounter;
-import storm.starter.util.TupleHelpers;
import java.util.HashMap;
import java.util.Map;
-import java.util.Map.Entry;
/**
* This bolt aggregates counts from multiple upstream bolts.
[15/27] storm git commit: Use isTick in all relevant places to avoid
code duplication.
Posted by pt...@apache.org.
Use isTick in all relevant places to avoid code duplication.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/6537b369
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/6537b369
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/6537b369
Branch: refs/heads/0.10.x-branch
Commit: 6537b3691378835ad135799fdb5a2cedf7ab4649
Parents: 62385d7
Author: Niels Basjes <nb...@bol.com>
Authored: Wed Oct 1 11:53:17 2014 +0200
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:27:13 2015 -0400
----------------------------------------------------------------------
storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/6537b369/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
index 4dfccc6..da4c1a5 100644
--- a/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
+++ b/storm-core/src/jvm/storm/trident/topology/TridentBoltExecutor.java
@@ -299,7 +299,7 @@ public class TridentBoltExecutor implements IRichBolt {
@Override
public void execute(Tuple tuple) {
- if(tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
+ if(tuple.isTick()) {
long now = System.currentTimeMillis();
if(now - _lastRotate > _messageTimeoutMs) {
_batches.rotate();