You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/10/07 23:16:24 UTC
[1/5] git commit: Moved storm.js to multilang folder. Added link from
dev/resources folder
Repository: storm
Updated Branches:
refs/heads/master 42a6dbd0f -> 7ab4f2418
Moved storm.js to multilang folder. Added link from dev/resources folder
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3b609c21
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3b609c21
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3b609c21
Branch: refs/heads/master
Commit: 3b609c21fee42951b6260d5344b7fc88fa2d96ab
Parents: a9fb9c5
Author: Itai Frenkel <it...@ryzyco.com>
Authored: Tue Sep 23 17:30:53 2014 +0300
Committer: Itai Frenkel <it...@ryzyco.com>
Committed: Tue Sep 23 18:33:31 2014 +0300
----------------------------------------------------------------------
storm-core/src/dev/resources/storm.js | 350 +----------------------------
storm-core/src/multilang/js/storm.js | 349 ++++++++++++++++++++++++++++
2 files changed, 350 insertions(+), 349 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3b609c21/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
deleted file mode 100755
index 5c78072..0000000
--- a/storm-core/src/dev/resources/storm.js
+++ /dev/null
@@ -1,349 +0,0 @@
-/**
- * Base classes in node-js for storm Bolt and Spout.
- * Implements the storm multilang protocol for nodejs.
- */
-
-
-var fs = require('fs');
-
-function Storm() {
- this.messagePart = "";
- this.taskIdsCallbacks = [];
- this.isFirstMessage = true;
- this.separator = '\nend\n';
-}
-
-Storm.prototype.sendMsgToParent = function(msg) {
- var str = JSON.stringify(msg);
- process.stdout.write(str + this.separator);
-}
-
-Storm.prototype.sync = function() {
- this.sendMsgToParent({"command":"sync"});
-}
-
-Storm.prototype.sendPid = function(heartbeatdir) {
- var pid = process.pid;
- fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
- this.sendMsgToParent({"pid": pid})
-}
-
-Storm.prototype.log = function(msg) {
- this.sendMsgToParent({"command": "log", "msg": msg});
-}
-
-Storm.prototype.initSetupInfo = function(setupInfo) {
- var self = this;
- var callback = function() {
- self.sendPid(setupInfo['pidDir']);
- }
- this.initialize(setupInfo['conf'], setupInfo['context'], callback);
-}
-
-Storm.prototype.startReadingInput = function() {
- var self = this;
- process.stdin.on('readable', function() {
- var chunk = process.stdin.read();
- var messages = self.handleNewChunk(chunk);
- messages.forEach(function(message) {
- self.handleNewMessage(message);
- })
-
- });
-}
-
-/**
- * receives a new string chunk and returns a list of new messages with the separator removed
- * stores state in this.messagePart
- * @param chunk
- */
-Storm.prototype.handleNewChunk = function(chunk) {
- //invariant: this.messagePart has no separator otherwise we would have parsed it already
- var messages = [];
- if (chunk && chunk.length !== 0) {
- //"{}".split("\nend\n") ==> ['{}']
- //"\nend\n".split("\nend\n") ==> ['' , '']
- //"{}\nend\n".split("\nend\n") ==> ['{}', '']
- //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
- // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
- this.messagePart = this.messagePart + chunk;
- var newMessageParts = this.messagePart.split(this.separator);
- while (newMessageParts.length > 0) {
- var potentialMessage = newMessageParts.shift();
- var anotherMessageAhead = newMessageParts.length > 0;
- if (!anotherMessageAhead) {
- this.messagePart = potentialMessage;
- }
- else if (potentialMessage.length > 0) {
- messages.push(potentialMessage);
- }
- }
- }
- return messages;
- }
-
-Storm.prototype.isTaskIds = function(msg) {
- return (msg instanceof Array);
-}
-
-Storm.prototype.handleNewMessage = function(msg) {
- var parsedMsg = JSON.parse(msg);
-
- if (this.isFirstMessage) {
- this.initSetupInfo(parsedMsg);
- this.isFirstMessage = false;
- } else if (this.isTaskIds(parsedMsg)) {
- this.handleNewTaskId(parsedMsg);
- } else {
- this.handleNewCommand(parsedMsg);
- }
-}
-
-Storm.prototype.handleNewTaskId = function(taskIds) {
- //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
- //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
- //take the first callback in the list and be sure it is the right one.
-
- var callback = this.taskIdsCallbacks.shift();
- if (callback) {
- callback(taskIds);
- } else {
- throw new Error('Something went wrong, we off the split of task id callbacks');
- }
-}
-
-
-
-/**
- *
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- *
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emit = function(messageDetails, onTaskIds) {
- //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
- //through the callback (will be called when the response arrives). The callback is stored in a list until the
- //corresponding task id list arrives.
- if (messageDetails.task) {
- throw new Error('Illegal input - task. To emit to specific task use emit direct!');
- }
-
- if (!onTaskIds) {
- throw new Error('You must pass a onTaskIds callback when using emit!')
- }
-
- this.taskIdsCallbacks.push(onTaskIds);
- this.__emit(messageDetails);;
-}
-
-
-/**
- * Emit message to specific task.
- * @param messageDetails json with the emit details.
- *
- * For bolt, the json must contain the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - stream (if empty - emit to default stream)
- *
- * For spout, the json must contain the required fields:
- * - tuple - the value to emit
- * - task - indicate the task to send the tuple to.
- * and may contain the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- *
- * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
- */
-Storm.prototype.emitDirect = function(commandDetails) {
- if (!commandDetails.task) {
- throw new Error("Emit direct must receive task id!")
- }
- this.__emit(commandDetails);
-}
-
-/**
- * Initialize storm component according to the configuration received.
- * @param conf configuration object accrding to storm protocol.
- * @param context context object according to storm protocol.
- * @param done callback. Call this method when finished initializing.
- */
-Storm.prototype.initialize = function(conf, context, done) {
- done();
-}
-
-Storm.prototype.run = function() {
- process.stdout.setEncoding('utf8');
- process.stdin.setEncoding('utf8');
- this.startReadingInput();
-}
-
-function Tuple(id, component, stream, task, values) {
- this.id = id;
- this.component = component;
- this.stream = stream;
- this.task = task;
- this.values = values;
-}
-
-/**
- * Base class for storm bolt.
- * To create a bolt implement 'process' method.
- * You may also implement initialize method to
- */
-function BasicBolt() {
- Storm.call(this);
- this.anchorTuple = null;
-};
-
-BasicBolt.prototype = Object.create(Storm.prototype);
-BasicBolt.prototype.constructor = BasicBolt;
-
-/**
- * Emit message.
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit
- * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
- * tuple and return ack when all components successfully finished to process it.
- * and the optional fields:
- * - stream (if empty - emit to default stream)
- * - task (pass only to emit to specific task)
- */
-BasicBolt.prototype.__emit = function(commandDetails) {
- var self = this;
-
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- stream: commandDetails.stream,
- task: commandDetails.task,
- anchors: [commandDetails.anchorTupleId]
- };
-
- this.sendMsgToParent(message);
-}
-
-BasicBolt.prototype.handleNewCommand = function(command) {
- var self = this;
- var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
- var callback = function(err) {
- if (err) {
- self.fail(tup, err);
- return;
- }
- self.ack(tup);
- }
- this.process(tup, callback);
-}
-
-/**
- * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
- * should it do?).
- * @param tuple the input of the bolt - what to process.
- * @param done call this method when done processing.
- */
-BasicBolt.prototype.process = function(tuple, done) {};
-
-BasicBolt.prototype.ack = function(tup) {
- this.sendMsgToParent({"command": "ack", "id": tup.id});
-}
-
-BasicBolt.prototype.fail = function(tup, err) {
- this.sendMsgToParent({"command": "fail", "id": tup.id});
-}
-
-
-/**
- * Base class for storm spout.
- * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
- * can stay empty).
- * You may also implement initialize method.
- *
- */
-function Spout() {
- Storm.call(this);
-};
-
-Spout.prototype = Object.create(Storm.prototype);
-
-Spout.prototype.constructor = Spout;
-
-/**
- * This method will be called when an ack is received for preciously sent tuple. One may implement it.
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.ack = function(id, done) {};
-
-/**
- * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
- * log the failure or send the tuple again).
- * @param id The id of the tuple.
- * @param done Call this method when finished and ready to receive more tuples.
- */
-Spout.prototype.fail = function(id, done) {};
-
-/**
- * Method the indicates its time to emit the next tuple.
- * @param done call this method when done sending the output.
- */
-Spout.prototype.nextTuple = function(done) {};
-
-Spout.prototype.handleNewCommand = function(command) {
- var self = this;
- var callback = function() {
- self.sync();
- }
-
- if (command["command"] === "next") {
- this.nextTuple(callback);
- }
-
- if (command["command"] === "ack") {
- this.ack(command["id"], callback);
- }
-
- if (command["command"] === "fail") {
- this.fail(command["id"], callback);
- }
-}
-
-/**
- * @param commandDetails json with the required fields:
- * - tuple - the value to emit.
- * and the optional fields:
- * - id - pass id for reliable emit (and receive ack/fail later).
- * - stream - if empty - emit to default stream.
- * - task - pass only to emit to specific task.
- */
-Spout.prototype.__emit = function(commandDetails) {
- var message = {
- command: "emit",
- tuple: commandDetails.tuple,
- id: commandDetails.id,
- stream: commandDetails.stream,
- task: commandDetails.task
- };
-
- this.sendMsgToParent(message);
-}
-
-module.exports.BasicBolt = BasicBolt;
-module.exports.Spout = Spout;
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
new file mode 120000
index 0000000..a5fc98b
--- /dev/null
+++ b/storm-core/src/dev/resources/storm.js
@@ -0,0 +1 @@
+../../multilang/js/storm.js
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/3b609c21/storm-core/src/multilang/js/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/multilang/js/storm.js b/storm-core/src/multilang/js/storm.js
new file mode 100755
index 0000000..5c78072
--- /dev/null
+++ b/storm-core/src/multilang/js/storm.js
@@ -0,0 +1,349 @@
+/**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+ this.messagePart = "";
+ this.taskIdsCallbacks = [];
+ this.isFirstMessage = true;
+ this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+ var str = JSON.stringify(msg);
+ process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+ this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+ var pid = process.pid;
+ fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+ this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+ this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+ var self = this;
+ var callback = function() {
+ self.sendPid(setupInfo['pidDir']);
+ }
+ this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+ var self = this;
+ process.stdin.on('readable', function() {
+ var chunk = process.stdin.read();
+ var messages = self.handleNewChunk(chunk);
+ messages.forEach(function(message) {
+ self.handleNewMessage(message);
+ })
+
+ });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messagePart
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+ //invariant: this.messagePart has no separator otherwise we would have parsed it already
+ var messages = [];
+ if (chunk && chunk.length !== 0) {
+ //"{}".split("\nend\n") ==> ['{}']
+ //"\nend\n".split("\nend\n") ==> ['' , '']
+ //"{}\nend\n".split("\nend\n") ==> ['{}', '']
+ //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
+ // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+ this.messagePart = this.messagePart + chunk;
+ var newMessageParts = this.messagePart.split(this.separator);
+ while (newMessageParts.length > 0) {
+ var potentialMessage = newMessageParts.shift();
+ var anotherMessageAhead = newMessageParts.length > 0;
+ if (!anotherMessageAhead) {
+ this.messagePart = potentialMessage;
+ }
+ else if (potentialMessage.length > 0) {
+ messages.push(potentialMessage);
+ }
+ }
+ }
+ return messages;
+ }
+
+Storm.prototype.isTaskIds = function(msg) {
+ return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+ var parsedMsg = JSON.parse(msg);
+
+ if (this.isFirstMessage) {
+ this.initSetupInfo(parsedMsg);
+ this.isFirstMessage = false;
+ } else if (this.isTaskIds(parsedMsg)) {
+ this.handleNewTaskId(parsedMsg);
+ } else {
+ this.handleNewCommand(parsedMsg);
+ }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+ //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
+ //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
+ //take the first callback in the list and be sure it is the right one.
+
+ var callback = this.taskIdsCallbacks.shift();
+ if (callback) {
+ callback(taskIds);
+ } else {
+ throw new Error('Something went wrong, we off the split of task id callbacks');
+ }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+ //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
+ //through the callback (will be called when the response arrives). The callback is stored in a list until the
+ //corresponding task id list arrives.
+ if (messageDetails.task) {
+ throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+ }
+
+ if (!onTaskIds) {
+ throw new Error('You must pass a onTaskIds callback when using emit!')
+ }
+
+ this.taskIdsCallbacks.push(onTaskIds);
+ this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+ if (!commandDetails.task) {
+ throw new Error("Emit direct must receive task id!")
+ }
+ this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+ done();
+}
+
+Storm.prototype.run = function() {
+ process.stdout.setEncoding('utf8');
+ process.stdin.setEncoding('utf8');
+ this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+ this.id = id;
+ this.component = component;
+ this.stream = stream;
+ this.task = task;
+ this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+ Storm.call(this);
+ this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+ var self = this;
+
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ stream: commandDetails.stream,
+ task: commandDetails.task,
+ anchors: [commandDetails.anchorTupleId]
+ };
+
+ this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+ var callback = function(err) {
+ if (err) {
+ self.fail(tup, err);
+ return;
+ }
+ self.ack(tup);
+ }
+ this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+ this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+ this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+ Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var callback = function() {
+ self.sync();
+ }
+
+ if (command["command"] === "next") {
+ this.nextTuple(callback);
+ }
+
+ if (command["command"] === "ack") {
+ this.ack(command["id"], callback);
+ }
+
+ if (command["command"] === "fail") {
+ this.fail(command["id"], callback);
+ }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ id: commandDetails.id,
+ stream: commandDetails.stream,
+ task: commandDetails.task
+ };
+
+ this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;
[2/5] git commit: Merge branch 'nodejs-multilang' of
github.com:forter/incubator-storm
Posted by pt...@apache.org.
Merge branch 'nodejs-multilang' of github.com:forter/incubator-storm
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/533f7bae
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/533f7bae
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/533f7bae
Branch: refs/heads/master
Commit: 533f7baebff24331c983ae9caecfa085968c3d11
Parents: 42a6dbd 3b609c2
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Oct 7 16:49:33 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Oct 7 16:49:33 2014 -0400
----------------------------------------------------------------------
storm-core/src/dev/resources/storm.js | 350 +----------------------------
storm-core/src/multilang/js/storm.js | 349 ++++++++++++++++++++++++++++
2 files changed, 350 insertions(+), 349 deletions(-)
----------------------------------------------------------------------
[4/5] git commit: add non-symlink version of storm.js
Posted by pt...@apache.org.
add non-symlink version of storm.js
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d2707d3c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d2707d3c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d2707d3c
Branch: refs/heads/master
Commit: d2707d3c37d9191ca0754243c64099ddd4a07422
Parents: e9953af
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Oct 7 16:51:55 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Oct 7 16:51:55 2014 -0400
----------------------------------------------------------------------
storm-core/src/dev/resources/storm.js | 349 +++++++++++++++++++++++++++++
1 file changed, 349 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d2707d3c/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
new file mode 100755
index 0000000..5c78072
--- /dev/null
+++ b/storm-core/src/dev/resources/storm.js
@@ -0,0 +1,349 @@
+/**
+ * Base classes in node-js for storm Bolt and Spout.
+ * Implements the storm multilang protocol for nodejs.
+ */
+
+
+var fs = require('fs');
+
+function Storm() {
+ this.messagePart = "";
+ this.taskIdsCallbacks = [];
+ this.isFirstMessage = true;
+ this.separator = '\nend\n';
+}
+
+Storm.prototype.sendMsgToParent = function(msg) {
+ var str = JSON.stringify(msg);
+ process.stdout.write(str + this.separator);
+}
+
+Storm.prototype.sync = function() {
+ this.sendMsgToParent({"command":"sync"});
+}
+
+Storm.prototype.sendPid = function(heartbeatdir) {
+ var pid = process.pid;
+ fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w"));
+ this.sendMsgToParent({"pid": pid})
+}
+
+Storm.prototype.log = function(msg) {
+ this.sendMsgToParent({"command": "log", "msg": msg});
+}
+
+Storm.prototype.initSetupInfo = function(setupInfo) {
+ var self = this;
+ var callback = function() {
+ self.sendPid(setupInfo['pidDir']);
+ }
+ this.initialize(setupInfo['conf'], setupInfo['context'], callback);
+}
+
+Storm.prototype.startReadingInput = function() {
+ var self = this;
+ process.stdin.on('readable', function() {
+ var chunk = process.stdin.read();
+ var messages = self.handleNewChunk(chunk);
+ messages.forEach(function(message) {
+ self.handleNewMessage(message);
+ })
+
+ });
+}
+
+/**
+ * receives a new string chunk and returns a list of new messages with the separator removed
+ * stores state in this.messagePart
+ * @param chunk
+ */
+Storm.prototype.handleNewChunk = function(chunk) {
+ //invariant: this.messagePart has no separator otherwise we would have parsed it already
+ var messages = [];
+ if (chunk && chunk.length !== 0) {
+ //"{}".split("\nend\n") ==> ['{}']
+ //"\nend\n".split("\nend\n") ==> ['' , '']
+ //"{}\nend\n".split("\nend\n") ==> ['{}', '']
+ //"\nend\n{}".split("\nend\n") ==> ['' , '{}']
+ // "{}\nend\n{}".split("\nend\n") ==> ['{}', '{}' ]
+ this.messagePart = this.messagePart + chunk;
+ var newMessageParts = this.messagePart.split(this.separator);
+ while (newMessageParts.length > 0) {
+ var potentialMessage = newMessageParts.shift();
+ var anotherMessageAhead = newMessageParts.length > 0;
+ if (!anotherMessageAhead) {
+ this.messagePart = potentialMessage;
+ }
+ else if (potentialMessage.length > 0) {
+ messages.push(potentialMessage);
+ }
+ }
+ }
+ return messages;
+ }
+
+Storm.prototype.isTaskIds = function(msg) {
+ return (msg instanceof Array);
+}
+
+Storm.prototype.handleNewMessage = function(msg) {
+ var parsedMsg = JSON.parse(msg);
+
+ if (this.isFirstMessage) {
+ this.initSetupInfo(parsedMsg);
+ this.isFirstMessage = false;
+ } else if (this.isTaskIds(parsedMsg)) {
+ this.handleNewTaskId(parsedMsg);
+ } else {
+ this.handleNewCommand(parsedMsg);
+ }
+}
+
+Storm.prototype.handleNewTaskId = function(taskIds) {
+ //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called.
+ //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply
+ //take the first callback in the list and be sure it is the right one.
+
+ var callback = this.taskIdsCallbacks.shift();
+ if (callback) {
+ callback(taskIds);
+ } else {
+ throw new Error('Something went wrong, we off the split of task id callbacks');
+ }
+}
+
+
+
+/**
+ *
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ *
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emit = function(messageDetails, onTaskIds) {
+ //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible
+ //through the callback (will be called when the response arrives). The callback is stored in a list until the
+ //corresponding task id list arrives.
+ if (messageDetails.task) {
+ throw new Error('Illegal input - task. To emit to specific task use emit direct!');
+ }
+
+ if (!onTaskIds) {
+ throw new Error('You must pass a onTaskIds callback when using emit!')
+ }
+
+ this.taskIdsCallbacks.push(onTaskIds);
+ this.__emit(messageDetails);;
+}
+
+
+/**
+ * Emit message to specific task.
+ * @param messageDetails json with the emit details.
+ *
+ * For bolt, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - stream (if empty - emit to default stream)
+ *
+ * For spout, the json must contain the required fields:
+ * - tuple - the value to emit
+ * - task - indicate the task to send the tuple to.
+ * and may contain the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ *
+ * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received).
+ */
+Storm.prototype.emitDirect = function(commandDetails) {
+ if (!commandDetails.task) {
+ throw new Error("Emit direct must receive task id!")
+ }
+ this.__emit(commandDetails);
+}
+
+/**
+ * Initialize storm component according to the configuration received.
+ * @param conf configuration object accrding to storm protocol.
+ * @param context context object according to storm protocol.
+ * @param done callback. Call this method when finished initializing.
+ */
+Storm.prototype.initialize = function(conf, context, done) {
+ done();
+}
+
+Storm.prototype.run = function() {
+ process.stdout.setEncoding('utf8');
+ process.stdin.setEncoding('utf8');
+ this.startReadingInput();
+}
+
+function Tuple(id, component, stream, task, values) {
+ this.id = id;
+ this.component = component;
+ this.stream = stream;
+ this.task = task;
+ this.values = values;
+}
+
+/**
+ * Base class for storm bolt.
+ * To create a bolt implement 'process' method.
+ * You may also implement initialize method to
+ */
+function BasicBolt() {
+ Storm.call(this);
+ this.anchorTuple = null;
+};
+
+BasicBolt.prototype = Object.create(Storm.prototype);
+BasicBolt.prototype.constructor = BasicBolt;
+
+/**
+ * Emit message.
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit
+ * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source
+ * tuple and return ack when all components successfully finished to process it.
+ * and the optional fields:
+ * - stream (if empty - emit to default stream)
+ * - task (pass only to emit to specific task)
+ */
+BasicBolt.prototype.__emit = function(commandDetails) {
+ var self = this;
+
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ stream: commandDetails.stream,
+ task: commandDetails.task,
+ anchors: [commandDetails.anchorTupleId]
+ };
+
+ this.sendMsgToParent(message);
+}
+
+BasicBolt.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]);
+ var callback = function(err) {
+ if (err) {
+ self.fail(tup, err);
+ return;
+ }
+ self.ack(tup);
+ }
+ this.process(tup, callback);
+}
+
+/**
+ * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what
+ * should it do?).
+ * @param tuple the input of the bolt - what to process.
+ * @param done call this method when done processing.
+ */
+BasicBolt.prototype.process = function(tuple, done) {};
+
+BasicBolt.prototype.ack = function(tup) {
+ this.sendMsgToParent({"command": "ack", "id": tup.id});
+}
+
+BasicBolt.prototype.fail = function(tup, err) {
+ this.sendMsgToParent({"command": "fail", "id": tup.id});
+}
+
+
+/**
+ * Base class for storm spout.
+ * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail
+ * can stay empty).
+ * You may also implement initialize method.
+ *
+ */
+function Spout() {
+ Storm.call(this);
+};
+
+Spout.prototype = Object.create(Storm.prototype);
+
+Spout.prototype.constructor = Spout;
+
+/**
+ * This method will be called when an ack is received for preciously sent tuple. One may implement it.
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.ack = function(id, done) {};
+
+/**
+ * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example -
+ * log the failure or send the tuple again).
+ * @param id The id of the tuple.
+ * @param done Call this method when finished and ready to receive more tuples.
+ */
+Spout.prototype.fail = function(id, done) {};
+
+/**
+ * Method the indicates its time to emit the next tuple.
+ * @param done call this method when done sending the output.
+ */
+Spout.prototype.nextTuple = function(done) {};
+
+Spout.prototype.handleNewCommand = function(command) {
+ var self = this;
+ var callback = function() {
+ self.sync();
+ }
+
+ if (command["command"] === "next") {
+ this.nextTuple(callback);
+ }
+
+ if (command["command"] === "ack") {
+ this.ack(command["id"], callback);
+ }
+
+ if (command["command"] === "fail") {
+ this.fail(command["id"], callback);
+ }
+}
+
+/**
+ * @param commandDetails json with the required fields:
+ * - tuple - the value to emit.
+ * and the optional fields:
+ * - id - pass id for reliable emit (and receive ack/fail later).
+ * - stream - if empty - emit to default stream.
+ * - task - pass only to emit to specific task.
+ */
+Spout.prototype.__emit = function(commandDetails) {
+ var message = {
+ command: "emit",
+ tuple: commandDetails.tuple,
+ id: commandDetails.id,
+ stream: commandDetails.stream,
+ task: commandDetails.task
+ };
+
+ this.sendMsgToParent(message);
+}
+
+module.exports.BasicBolt = BasicBolt;
+module.exports.Spout = Spout;
[3/5] git commit: remove storm.js symlink
Posted by pt...@apache.org.
remove storm.js symlink
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9953aff
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9953aff
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9953aff
Branch: refs/heads/master
Commit: e9953aff4a8c42ec363cd1d81719c970446dc5b6
Parents: 533f7ba
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Oct 7 16:50:51 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Oct 7 16:50:51 2014 -0400
----------------------------------------------------------------------
storm-core/src/dev/resources/storm.js | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e9953aff/storm-core/src/dev/resources/storm.js
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.js b/storm-core/src/dev/resources/storm.js
deleted file mode 120000
index a5fc98b..0000000
--- a/storm-core/src/dev/resources/storm.js
+++ /dev/null
@@ -1 +0,0 @@
-../../multilang/js/storm.js
\ No newline at end of file
[5/5] git commit: fix platform-specific path separators (missed in a
previous merge)
Posted by pt...@apache.org.
fix platform-specific path separators (missed in a previous merge)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7ab4f241
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7ab4f241
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7ab4f241
Branch: refs/heads/master
Commit: 7ab4f2418b99ebec0c27c4042942a26e4f4972be
Parents: d2707d3
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Oct 7 17:16:17 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Oct 7 17:16:17 2014 -0400
----------------------------------------------------------------------
.../test/clj/backtype/storm/supervisor_test.clj | 16 +++++++---------
1 file changed, 7 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7ab4f241/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index a61a2ae..ba74d88 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -247,7 +247,7 @@
(let [mock-port "42"
mock-storm-id "fake-storm-id"
mock-worker-id "fake-worker-id"
- mock-cp "/base:/stormjar.jar"
+ mock-cp (str file-path-separator "base" class-path-separator file-path-separator "stormjar.jar")
exp-args-fn (fn [opts topo-opts classpath]
(concat [(supervisor/java-cmd) "-server"]
opts
@@ -255,10 +255,8 @@
["-Djava.library.path="
(str "-Dlogfile.name=worker-" mock-port ".log")
"-Dstorm.home="
- "-Dstorm.conf.file="
- "-Dstorm.options="
- "-Dstorm.log.dir=/logs"
- "-Dlogback.configurationFile=/logback/cluster.xml"
+ (str "-Dstorm.log.dir=" file-path-separator "logs")
+ (str "-Dlogback.configurationFile=" file-path-separator "logback" file-path-separator "cluster.xml")
(str "-Dstorm.id=" mock-storm-id)
(str "-Dworker.id=" mock-worker-id)
(str "-Dworker.port=" mock-port)
@@ -308,14 +306,14 @@
[0]
exp-args))))
(testing "testing topology.classpath is added to classpath"
- (let [topo-cp "/any/path"
+ (let [topo-cp (str file-path-separator "any" file-path-separator "path")
exp-args (exp-args-fn [] [] (add-to-classpath mock-cp [topo-cp]))
mock-supervisor {:conf {STORM-CLUSTER-MODE :distributed}}]
(stubbing [read-supervisor-storm-conf {TOPOLOGY-CLASSPATH topo-cp}
supervisor-stormdist-root nil
supervisor/jlp nil
launch-process nil
- current-classpath "/base"]
+ current-classpath (str file-path-separator "base")]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@ -331,7 +329,7 @@
supervisor-stormdist-root nil
supervisor/jlp nil
launch-process nil
- current-classpath "/base"]
+ current-classpath (str file-path-separator "base")]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@ -471,4 +469,4 @@
(validate-launched-once (:launched changed)
{"sup1" [3 4]}
(get-storm-id (:storm-cluster-state cluster) "topology2"))
- )))
+ )))
\ No newline at end of file