You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2014/10/15 18:15:49 UTC
[36/50] git commit: add non-symlink versions of storm.py and storm.rb
add non-symlink versions of storm.py and storm.rb
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc641a2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc641a2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc641a2d
Branch: refs/heads/security
Commit: fc641a2da3079eba7c2980ce7ff5afcee8be407e
Parents: 8079245
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Oct 7 16:20:17 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Oct 7 16:20:17 2014 -0400
----------------------------------------------------------------------
storm-core/src/dev/resources/storm.py | 247 +++++++++++++++++++++++++++++
storm-core/src/dev/resources/storm.rb | 227 ++++++++++++++++++++++++++
2 files changed, 474 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fc641a2d/storm-core/src/dev/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py
new file mode 100755
index 0000000..d2a3082
--- /dev/null
+++ b/storm-core/src/dev/resources/storm.py
@@ -0,0 +1,247 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import os
+import traceback
+from collections import deque
+
+try:
+ import simplejson as json
+except ImportError:
+ import json
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
+
+#reads lines and reconstructs newlines appropriately
+def readMsg():
+ msg = ""
+ while True:
+ line = sys.stdin.readline()
+ if not line:
+ raise Exception('Read EOF from stdin')
+ if line[0:-1] == "end":
+ break
+ msg = msg + line
+ return json_decode(msg[0:-1])
+
+MODE = None
+ANCHOR_TUPLE = None
+
+#queue up commands we read while trying to read taskids
+pending_commands = deque()
+
+def readTaskIds():
+ if pending_taskids:
+ return pending_taskids.popleft()
+ else:
+ msg = readMsg()
+ while type(msg) is not list:
+ pending_commands.append(msg)
+ msg = readMsg()
+ return msg
+
+#queue up taskids we read while trying to read commands/tuples
+pending_taskids = deque()
+
+def readCommand():
+ if pending_commands:
+ return pending_commands.popleft()
+ else:
+ msg = readMsg()
+ while type(msg) is list:
+ pending_taskids.append(msg)
+ msg = readMsg()
+ return msg
+
+def readTuple():
+ cmd = readCommand()
+ return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
+def sendMsgToParent(msg):
+ print json_encode(msg)
+ print "end"
+ sys.stdout.flush()
+
+def sync():
+ sendMsgToParent({'command':'sync'})
+
+def sendpid(heartbeatdir):
+ pid = os.getpid()
+ sendMsgToParent({'pid':pid})
+ open(heartbeatdir + "/" + str(pid), "w").close()
+
+def emit(*args, **kwargs):
+ __emit(*args, **kwargs)
+ return readTaskIds()
+
+def emitDirect(task, *args, **kwargs):
+ kwargs["directTask"] = task
+ __emit(*args, **kwargs)
+
+def __emit(*args, **kwargs):
+ global MODE
+ if MODE == Bolt:
+ emitBolt(*args, **kwargs)
+ elif MODE == Spout:
+ emitSpout(*args, **kwargs)
+
+def emitBolt(tup, stream=None, anchors = [], directTask=None):
+ global ANCHOR_TUPLE
+ if ANCHOR_TUPLE is not None:
+ anchors = [ANCHOR_TUPLE]
+ m = {"command": "emit"}
+ if stream is not None:
+ m["stream"] = stream
+ m["anchors"] = map(lambda a: a.id, anchors)
+ if directTask is not None:
+ m["task"] = directTask
+ m["tuple"] = tup
+ sendMsgToParent(m)
+
+def emitSpout(tup, stream=None, id=None, directTask=None):
+ m = {"command": "emit"}
+ if id is not None:
+ m["id"] = id
+ if stream is not None:
+ m["stream"] = stream
+ if directTask is not None:
+ m["task"] = directTask
+ m["tuple"] = tup
+ sendMsgToParent(m)
+
+def ack(tup):
+ sendMsgToParent({"command": "ack", "id": tup.id})
+
+def fail(tup):
+ sendMsgToParent({"command": "fail", "id": tup.id})
+
+def reportError(msg):
+ sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+ sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+ log(msg, 0)
+
+def logDebug(msg):
+ log(msg, 1)
+
+def logInfo(msg):
+ log(msg, 2)
+
+def logWarn(msg):
+ log(msg, 3)
+
+def logError(msg):
+ log(msg, 4)
+
+def rpcMetrics(name, params):
+ sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
+def initComponent():
+ setupInfo = readMsg()
+ sendpid(setupInfo['pidDir'])
+ return [setupInfo['conf'], setupInfo['context']]
+
+class Tuple(object):
+ def __init__(self, id, component, stream, task, values):
+ self.id = id
+ self.component = component
+ self.stream = stream
+ self.task = task
+ self.values = values
+
+ def __repr__(self):
+ return '<%s%s>' % (
+ self.__class__.__name__,
+ ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
+
+class Bolt(object):
+ def initialize(self, stormconf, context):
+ pass
+
+ def process(self, tuple):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Bolt
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ tup = readTuple()
+ self.process(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+
+class BasicBolt(object):
+ def initialize(self, stormconf, context):
+ pass
+
+ def process(self, tuple):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Bolt
+ global ANCHOR_TUPLE
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ tup = readTuple()
+ ANCHOR_TUPLE = tup
+ self.process(tup)
+ ack(tup)
+ except Exception, e:
+ reportError(traceback.format_exc(e))
+
+class Spout(object):
+ def initialize(self, conf, context):
+ pass
+
+ def ack(self, id):
+ pass
+
+ def fail(self, id):
+ pass
+
+ def nextTuple(self):
+ pass
+
+ def run(self):
+ global MODE
+ MODE = Spout
+ conf, context = initComponent()
+ try:
+ self.initialize(conf, context)
+ while True:
+ msg = readCommand()
+ if msg["command"] == "next":
+ self.nextTuple()
+ if msg["command"] == "ack":
+ self.ack(msg["id"])
+ if msg["command"] == "fail":
+ self.fail(msg["id"])
+ sync()
+ except Exception, e:
+ reportError(traceback.format_exc(e))
http://git-wip-us.apache.org/repos/asf/storm/blob/fc641a2d/storm-core/src/dev/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.rb b/storm-core/src/dev/resources/storm.rb
new file mode 100644
index 0000000..17232d1
--- /dev/null
+++ b/storm-core/src/dev/resources/storm.rb
@@ -0,0 +1,227 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require "rubygems"
+require "json"
+
+module Storm
+ module Protocol
+ class << self
+ attr_accessor :mode, :pending_taskids, :pending_commands
+ end
+
+ self.pending_taskids = []
+ self.pending_commands = []
+
+ def read_message
+ msg = ""
+ loop do
+ line = STDIN.readline.chomp
+ break if line == "end"
+ msg << line
+ msg << "\n"
+ end
+ JSON.parse msg.chomp
+ end
+
+ def read_task_ids
+ Storm::Protocol.pending_taskids.shift ||
+ begin
+ msg = read_message
+ until msg.is_a? Array
+ Storm::Protocol.pending_commands.push(msg)
+ msg = read_message
+ end
+ msg
+ end
+ end
+
+ def read_command
+ Storm::Protocol.pending_commands.shift ||
+ begin
+ msg = read_message
+ while msg.is_a? Array
+ Storm::Protocol.pending_taskids.push(msg)
+ msg = read_message
+ end
+ msg
+ end
+ end
+
+ def send_msg_to_parent(msg)
+ puts msg.to_json
+ puts "end"
+ STDOUT.flush
+ end
+
+ def sync
+ send_msg_to_parent({'command' => 'sync'})
+ end
+
+ def send_pid(heartbeat_dir)
+ pid = Process.pid
+ send_msg_to_parent({'pid' => pid})
+ File.open("#{heartbeat_dir}/#{pid}", "w").close
+ end
+
+ def emit_bolt(tup, args = {})
+ stream = args[:stream]
+ anchors = args[:anchors] || args[:anchor] || []
+ anchors = [anchors] unless anchors.is_a? Enumerable
+ direct = args[:direct_task]
+ m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+ m[:stream] = stream if stream
+ m[:task] = direct if direct
+ send_msg_to_parent m
+ read_task_ids unless direct
+ end
+
+ def emit_spout(tup, args = {})
+ stream = args[:stream]
+ id = args[:id]
+ direct = args[:direct_task]
+ m = {:command => :emit, :tuple => tup}
+ m[:id] = id if id
+ m[:stream] = stream if stream
+ m[:task] = direct if direct
+ send_msg_to_parent m
+ read_task_ids unless direct
+ end
+
+ def emit(*args)
+ case Storm::Protocol.mode
+ when 'spout'
+ emit_spout(*args)
+ when 'bolt'
+ emit_bolt(*args)
+ end
+ end
+
+ def ack(tup)
+ send_msg_to_parent :command => :ack, :id => tup.id
+ end
+
+ def fail(tup)
+ send_msg_to_parent :command => :fail, :id => tup.id
+ end
+
+ def reportError(msg)
+ send_msg_to_parent :command => :error, :msg => msg.to_s
+ end
+
+ def log(msg, level=2)
+ send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+ end
+
+ def logTrace(msg)
+ log(msg, 0)
+ end
+
+ def logDebug(msg)
+ log(msg, 1)
+ end
+
+ def logInfo(msg)
+ log(msg, 2)
+ end
+
+ def logWarn(msg)
+ log(msg, 3)
+ end
+
+ def logError(msg)
+ log(msg, 4)
+ end
+
+ def handshake
+ setup_info = read_message
+ send_pid setup_info['pidDir']
+ [setup_info['conf'], setup_info['context']]
+ end
+ end
+
+ class Tuple
+ attr_accessor :id, :component, :stream, :task, :values
+
+ def initialize(id, component, stream, task, values)
+ @id = id
+ @component = component
+ @stream = stream
+ @task = task
+ @values = values
+ end
+
+ def self.from_hash(hash)
+ Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+ end
+ end
+
+ class Bolt
+ include Storm::Protocol
+
+ def prepare(conf, context); end
+
+ def process(tuple); end
+
+ def run
+ Storm::Protocol.mode = 'bolt'
+ prepare(*handshake)
+ begin
+ while true
+ process Tuple.from_hash(read_command)
+ end
+ rescue Exception => e
+ reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+ end
+ end
+ end
+
+ class Spout
+ include Storm::Protocol
+
+ def open(conf, context); end
+
+ def nextTuple; end
+
+ def ack(id); end
+
+ def fail(id); end
+
+ def run
+ Storm::Protocol.mode = 'spout'
+ open(*handshake)
+
+ begin
+ while true
+ msg = read_command
+ case msg['command']
+ when 'next'
+ nextTuple
+ when 'ack'
+ ack(msg['id'])
+ when 'fail'
+ fail(msg['id'])
+ end
+ sync
+ end
+ rescue Exception => e
+ reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+ end
+ end
+ end
+end