You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/05/15 23:27:26 UTC
[06/27] storm git commit: update storm-starter to use multilang
components from storm distribution;
switch to maven shade plugin for uber jar creation
update storm-starter to use multilang components from storm distribution; switch to maven shade plugin for uber jar creation
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3c701415
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3c701415
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3c701415
Branch: refs/heads/0.10.x-branch
Commit: 3c701415b3891cad6d93ff8147e635cad465300e
Parents: db49216
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Mon Apr 6 17:19:47 2015 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri May 15 15:12:17 2015 -0400
----------------------------------------------------------------------
examples/storm-starter/README.markdown | 8 +-
.../storm-starter/multilang/resources/storm.js | 373 -------------------
.../storm-starter/multilang/resources/storm.py | 260 -------------
.../storm-starter/multilang/resources/storm.rb | 236 ------------
examples/storm-starter/pom.xml | 15 +
5 files changed, 19 insertions(+), 873 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/README.markdown
----------------------------------------------------------------------
diff --git a/examples/storm-starter/README.markdown b/examples/storm-starter/README.markdown
index 8161a06..1cb6636 100644
--- a/examples/storm-starter/README.markdown
+++ b/examples/storm-starter/README.markdown
@@ -96,20 +96,20 @@ You can package a jar suitable for submitting to a Storm cluster with the comman
$ mvn package
This will package your code and all the non-Storm dependencies into a single "uberjar" (or "fat jar") at the path
-`target/storm-starter-{version}-jar-with-dependencies.jar`.
+`target/storm-starter-{version}.jar`.
Example filename of the uberjar:
- >>> target/storm-starter-0.9.3-incubating-SNAPSHOT-jar-with-dependencies.jar
+ >>> target/storm-starter-0.9.3-incubating-SNAPSHOT.jar
You can submit (run) a topology contained in this uberjar to Storm via the `storm` CLI tool:
# Example 1: Run the RollingTopWords in local mode (LocalCluster)
- $ storm jar storm-starter-*-jar-with-dependencies.jar storm.starter.RollingTopWords
+ $ storm jar storm-starter-*.jar storm.starter.RollingTopWords
# Example 2: Run the RollingTopWords in remote/cluster mode,
# under the name "production-topology"
- $ storm jar storm-starter-*-jar-with-dependencies.jar storm.starter.RollingTopWords production-topology remote
+ $ storm jar storm-starter-*.jar storm.starter.RollingTopWords production-topology remote
_Submitting a topology in local vs. remote mode:_
It depends on the actual code of a topology how you can or even must tell Storm whether to run the topology locally (in
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.js
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js
deleted file mode 100755
index 355c2d2..0000000
--- a/examples/storm-starter/multilang/resources/storm.js
+++ /dev/null
@@ -1,373 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
- this.messagePart = "";
- this.taskIdsCallbacks = [];
- this.isFirstMessage = true;
- this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
- var str = JSON.stringify(msg);
- process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
- this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
- var pid = process.pid;
- fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
- this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
- this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
- var self = this;
- var callback = function() {
- self.sendPid(setupInfo['pidDir']);
- }
- this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
- var self = this;
- process.stdin.on('readable', function() {
- var chunk = process.stdin.read();
- var messages = self.handleNewChunk(chunk);
- messages.forEach(function(message) {
- self.handleNewMessage(message);
- })
-
- });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
- //invariant: this.messagePart has no separator otherwise we would have parsed it already
- var messages = [];
- if (chunk && chunk.length !== 0) {
- //"{}".split("\nend\n") ==> ['{}']
- //"\nend\n".split("\nend\n") ==> ['' , '']
- //"{}\nend\n".split("\nend\n") ==> ['{}', '']
- //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
- // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
- this.messagePart = this.messagePart + chunk;
- var newMessageParts = this.messagePart.split(this.separator);
- while (newMessageParts.length > 0) {
- var potentialMessage = newMessageParts.shift();
- var anotherMessageAhead = newMessageParts.length > 0;
- if (!anotherMessageAhead) {
- this.messagePart = potentialMessage;
- }
- else if (potentialMessage.length > 0) {
- messages.push(potentialMessage);
- }
- }
- }
- return messages;
-}
-
-Storm.prototype.isTaskIds = function(msg) {
- return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
- var parsedMsg = JSON.parse(msg);
-
- if (this.isFirstMessage) {
- this.initSetupInfo(parsedMsg);
- this.isFirstMessage = false;
- } else if (this.isTaskIds(parsedMsg)) {
- this.handleNewTaskId(parsedMsg);
- } else {
- this.handleNewCommand(parsedMsg);
- }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
- //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
- //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
- //take the first callback in the list and be sure it is the right one.
-
- var callback = this.taskIdsCallbacks.shift();
- if (callback) {
- callback(taskIds);
- } else {
- throw new Error('Something went wrong, we off the split of task id callbacks');
- }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
- //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
- //through the callback (will be called when the response arrives). The callback is stored in a list until the
- //corresponding task id list arrives.
- if (messageDetails.task) {
- throw new Error('Illegal input - task. To emit to specific task use emit direct!');
- }
-
- if (!onTaskIds) {
- throw new Error('You must pass a onTaskIds callback when using emit!')
- }
-
- this.taskIdsCallbacks.push(onTaskIds);
- this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
- if (!commandDetails.task) {
- throw new Error("Emit direct must receive task id!")
- }
- this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
- done();
-}
-
-Storm.prototype.run = function() {
- process.stdout.setEncoding('utf8');
- process.stdin.setEncoding('utf8');
- this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
- this.id = id;
- this.component = component;
- this.stream = stream;
- this.task = task;
- this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
- Storm.call(this);
- this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
- var self = this;
-
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- stream: commandDetails.stream,
- task: commandDetails.task,
- anchors: [commandDetails.anchorTupleId]
- };
-
- this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
- var self = this;
- var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
-
- if (tup.task === -1 && tup.stream === "__heartbeat") {
- self.sync();
- return;
- }
-
- var callback = function(err) {
- if (err) {
- self.fail(tup, err);
- return;
- }
- self.ack(tup);
- }
- this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
- this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
- this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
- Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
- var self = this;
- var callback = function() {
- self.sync();
- }
-
- if (command["command"] === "next") {
- this.nextTuple(callback);
- }
-
- if (command["command"] === "ack") {
- this.ack(command["id"], callback);
- }
-
- if (command["command"] === "fail") {
- this.fail(command["id"], callback);
- }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- id: commandDetails.id,
- stream: commandDetails.stream,
- task: commandDetails.task
- };
-
- this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.py
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.py b/examples/storm-starter/multilang/resources/storm.py
deleted file mode 100644
index 642c393..0000000
--- a/examples/storm-starter/multilang/resources/storm.py
+++ /dev/null
@@ -1,260 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import sys
-import os
-import traceback
-from collections import deque
-
-try:
- import simplejson as json
-except ImportError:
- import json
-
-json_encode = lambda x: json.dumps(x)
-json_decode = lambda x: json.loads(x)
-
-#reads lines and reconstructs newlines appropriately
-def readMsg():
- msg = ""
- while True:
- line = sys.stdin.readline()
- if not line:
- raise Exception('Read EOF from stdin')
- if line[0:-1] == "end":
- break
- msg = msg + line
- return json_decode(msg[0:-1])
-
-MODE = None
-ANCHOR_TUPLE = None
-
-#queue up commands we read while trying to read taskids
-pending_commands = deque()
-
-def readTaskIds():
- if pending_taskids:
- return pending_taskids.popleft()
- else:
- msg = readMsg()
- while type(msg) is not list:
- pending_commands.append(msg)
- msg = readMsg()
- return msg
-
-#queue up taskids we read while trying to read commands/tuples
-pending_taskids = deque()
-
-def readCommand():
- if pending_commands:
- return pending_commands.popleft()
- else:
- msg = readMsg()
- while type(msg) is list:
- pending_taskids.append(msg)
- msg = readMsg()
- return msg
-
-def readTuple():
- cmd = readCommand()
- return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
-
-def sendMsgToParent(msg):
- print json_encode(msg)
- print "end"
- sys.stdout.flush()
-
-def sync():
- sendMsgToParent({'command':'sync'})
-
-def sendpid(heartbeatdir):
- pid = os.getpid()
- sendMsgToParent({'pid':pid})
- open(heartbeatdir + "/" + str(pid), "w").close()
-
-def emit(*args, **kwargs):
- __emit(*args, **kwargs)
- return readTaskIds()
-
-def emitDirect(task, *args, **kwargs):
- kwargs["directTask"] = task
- __emit(*args, **kwargs)
-
-def __emit(*args, **kwargs):
- global MODE
- if MODE == Bolt:
- emitBolt(*args, **kwargs)
- elif MODE == Spout:
- emitSpout(*args, **kwargs)
-
-def emitBolt(tup, stream=None, anchors = [], directTask=None):
- global ANCHOR_TUPLE
- if ANCHOR_TUPLE is not None:
- anchors = [ANCHOR_TUPLE]
- m = {"command": "emit"}
- if stream is not None:
- m["stream"] = stream
- m["anchors"] = map(lambda a: a.id, anchors)
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def emitSpout(tup, stream=None, id=None, directTask=None):
- m = {"command": "emit"}
- if id is not None:
- m["id"] = id
- if stream is not None:
- m["stream"] = stream
- if directTask is not None:
- m["task"] = directTask
- m["tuple"] = tup
- sendMsgToParent(m)
-
-def ack(tup):
- sendMsgToParent({"command": "ack", "id": tup.id})
-
-def fail(tup):
- sendMsgToParent({"command": "fail", "id": tup.id})
-
-def reportError(msg):
- sendMsgToParent({"command": "error", "msg": msg})
-
-def log(msg, level=2):
- sendMsgToParent({"command": "log", "msg": msg, "level":level})
-
-def logTrace(msg):
- log(msg, 0)
-
-def logDebug(msg):
- log(msg, 1)
-
-def logInfo(msg):
- log(msg, 2)
-
-def logWarn(msg):
- log(msg, 3)
-
-def logError(msg):
- log(msg, 4)
-
-def rpcMetrics(name, params):
- sendMsgToParent({"command": "metrics", "name": name, "params": params})
-
-def initComponent():
- setupInfo = readMsg()
- sendpid(setupInfo['pidDir'])
- return [setupInfo['conf'], setupInfo['context']]
-
-class Tuple(object):
- def __init__(self, id, component, stream, task, values):
- self.id = id
- self.component = component
- self.stream = stream
- self.task = task
- self.values = values
-
- def __repr__(self):
- return '<%s%s>' % (
- self.__class__.__name__,
- ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
-
- def is_heartbeat_tuple(self):
- return self.task == -1 and self.stream == "__heartbeat"
-
-class Bolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- self.process(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class BasicBolt(object):
- def initialize(self, stormconf, context):
- pass
-
- def process(self, tuple):
- pass
-
- def run(self):
- global MODE
- MODE = Bolt
- global ANCHOR_TUPLE
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- tup = readTuple()
- if tup.is_heartbeat_tuple():
- sync()
- else:
- ANCHOR_TUPLE = tup
- try:
- self.process(tup)
- ack(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
- fail(tup)
- except Exception, e:
- reportError(traceback.format_exc(e))
-
-class Spout(object):
- def initialize(self, conf, context):
- pass
-
- def ack(self, id):
- pass
-
- def fail(self, id):
- pass
-
- def nextTuple(self):
- pass
-
- def run(self):
- global MODE
- MODE = Spout
- conf, context = initComponent()
- try:
- self.initialize(conf, context)
- while True:
- msg = readCommand()
- if msg["command"] == "next":
- self.nextTuple()
- if msg["command"] == "ack":
- self.ack(msg["id"])
- if msg["command"] == "fail":
- self.fail(msg["id"])
- sync()
- except Exception, e:
- reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/multilang/resources/storm.rb
----------------------------------------------------------------------
diff --git a/examples/storm-starter/multilang/resources/storm.rb b/examples/storm-starter/multilang/resources/storm.rb
deleted file mode 100644
index 816694e..0000000
--- a/examples/storm-starter/multilang/resources/storm.rb
+++ /dev/null
@@ -1,236 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-require "rubygems"
-require "json"
-
-module Storm
- module Protocol
- class << self
- attr_accessor :mode, :pending_taskids, :pending_commands
- end
-
- self.pending_taskids = []
- self.pending_commands = []
-
- def read_message
- msg = ""
- loop do
- line = STDIN.readline.chomp
- break if line == "end"
- msg << line
- msg << "\n"
- end
- JSON.parse msg.chomp
- end
-
- def read_task_ids
- Storm::Protocol.pending_taskids.shift ||
- begin
- msg = read_message
- until msg.is_a? Array
- Storm::Protocol.pending_commands.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def read_command
- Storm::Protocol.pending_commands.shift ||
- begin
- msg = read_message
- while msg.is_a? Array
- Storm::Protocol.pending_taskids.push(msg)
- msg = read_message
- end
- msg
- end
- end
-
- def send_msg_to_parent(msg)
- puts msg.to_json
- puts "end"
- STDOUT.flush
- end
-
- def sync
- send_msg_to_parent({'command' => 'sync'})
- end
-
- def send_pid(heartbeat_dir)
- pid = Process.pid
- send_msg_to_parent({'pid' => pid})
- File.open("#{heartbeat_dir}/#{pid}", "w").close
- end
-
- def emit_bolt(tup, args = {})
- stream = args[:stream]
- anchors = args[:anchors] || args[:anchor] || []
- anchors = [anchors] unless anchors.is_a? Enumerable
- direct = args[:direct_task]
- m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit_spout(tup, args = {})
- stream = args[:stream]
- id = args[:id]
- direct = args[:direct_task]
- m = {:command => :emit, :tuple => tup}
- m[:id] = id if id
- m[:stream] = stream if stream
- m[:task] = direct if direct
- send_msg_to_parent m
- read_task_ids unless direct
- end
-
- def emit(*args)
- case Storm::Protocol.mode
- when 'spout'
- emit_spout(*args)
- when 'bolt'
- emit_bolt(*args)
- end
- end
-
- def ack(tup)
- send_msg_to_parent :command => :ack, :id => tup.id
- end
-
- def fail(tup)
- send_msg_to_parent :command => :fail, :id => tup.id
- end
-
- def reportError(msg)
- send_msg_to_parent :command => :error, :msg => msg.to_s
- end
-
- def log(msg, level=2)
- send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
- end
-
- def logTrace(msg)
- log(msg, 0)
- end
-
- def logDebug(msg)
- log(msg, 1)
- end
-
- def logInfo(msg)
- log(msg, 2)
- end
-
- def logWarn(msg)
- log(msg, 3)
- end
-
- def logError(msg)
- log(msg, 4)
- end
-
- def handshake
- setup_info = read_message
- send_pid setup_info['pidDir']
- [setup_info['conf'], setup_info['context']]
- end
- end
-
- class Tuple
- attr_accessor :id, :component, :stream, :task, :values
-
- def initialize(id, component, stream, task, values)
- @id = id
- @component = component
- @stream = stream
- @task = task
- @values = values
- end
-
- def self.from_hash(hash)
- Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
- end
-
- def is_heartbeat
- task == -1 and stream == '__heartbeat'
- end
- end
-
- class Bolt
- include Storm::Protocol
-
- def prepare(conf, context); end
-
- def process(tuple); end
-
- def run
- Storm::Protocol.mode = 'bolt'
- prepare(*handshake)
- begin
- while true
- tuple = Tuple.from_hash(read_command)
- if tuple.is_heartbeat
- sync
- else
- process tuple
- end
- end
- rescue Exception => e
- reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-
- class Spout
- include Storm::Protocol
-
- def open(conf, context); end
-
- def nextTuple; end
-
- def ack(id); end
-
- def fail(id); end
-
- def run
- Storm::Protocol.mode = 'spout'
- open(*handshake)
-
- begin
- while true
- msg = read_command
- case msg['command']
- when 'next'
- nextTuple
- when 'ack'
- ack(msg['id'])
- when 'fail'
- fail(msg['id'])
- end
- sync
- end
- rescue Exception => e
- reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
- end
- end
- end
-end
http://git-wip-us.apache.org/repos/asf/storm/blob/3c701415/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 5afe26e..8682a4b 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -72,6 +72,21 @@
<!-- keep storm out of the jar-with-dependencies -->
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-javascript</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-ruby</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>multilang-python</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>