You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/10/07 22:23:50 UTC

[2/2] git commit: add non-symlink versions of storm.py and storm.rb

add non-symlink versions of storm.py and storm.rb


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fc641a2d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fc641a2d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fc641a2d

Branch: refs/heads/master
Commit: fc641a2da3079eba7c2980ce7ff5afcee8be407e
Parents: 8079245
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Tue Oct 7 16:20:17 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Tue Oct 7 16:20:17 2014 -0400

----------------------------------------------------------------------
 storm-core/src/dev/resources/storm.py | 247 +++++++++++++++++++++++++++++
 storm-core/src/dev/resources/storm.rb | 227 ++++++++++++++++++++++++++
 2 files changed, 474 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fc641a2d/storm-core/src/dev/resources/storm.py
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.py b/storm-core/src/dev/resources/storm.py
new file mode 100755
index 0000000..d2a3082
--- /dev/null
+++ b/storm-core/src/dev/resources/storm.py
@@ -0,0 +1,247 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import os
+import traceback
+from collections import deque
+
+try:
+    import simplejson as json
+except ImportError:
+    import json
+
+json_encode = lambda x: json.dumps(x)
+json_decode = lambda x: json.loads(x)
+
+#reads lines and reconstructs newlines appropriately
+def readMsg():
+    msg = ""
+    while True:
+        line = sys.stdin.readline()
+        if not line:
+            raise Exception('Read EOF from stdin')
+        if line[0:-1] == "end":
+            break
+        msg = msg + line
+    return json_decode(msg[0:-1])
+
+MODE = None
+ANCHOR_TUPLE = None
+
+#queue up commands we read while trying to read taskids
+pending_commands = deque()
+
+def readTaskIds():
+    if pending_taskids:
+        return pending_taskids.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is not list:
+            pending_commands.append(msg)
+            msg = readMsg()
+        return msg
+
+#queue up taskids we read while trying to read commands/tuples
+pending_taskids = deque()
+
+def readCommand():
+    if pending_commands:
+        return pending_commands.popleft()
+    else:
+        msg = readMsg()
+        while type(msg) is list:
+            pending_taskids.append(msg)
+            msg = readMsg()
+        return msg
+
+def readTuple():
+    cmd = readCommand()
+    return Tuple(cmd["id"], cmd["comp"], cmd["stream"], cmd["task"], cmd["tuple"])
+
+def sendMsgToParent(msg):
+    print json_encode(msg)
+    print "end"
+    sys.stdout.flush()
+
+def sync():
+    sendMsgToParent({'command':'sync'})
+
+def sendpid(heartbeatdir):
+    pid = os.getpid()
+    sendMsgToParent({'pid':pid})
+    open(heartbeatdir + "/" + str(pid), "w").close()
+
+def emit(*args, **kwargs):
+    __emit(*args, **kwargs)
+    return readTaskIds()
+
+def emitDirect(task, *args, **kwargs):
+    kwargs["directTask"] = task
+    __emit(*args, **kwargs)
+
+def __emit(*args, **kwargs):
+    global MODE
+    if MODE == Bolt:
+        emitBolt(*args, **kwargs)
+    elif MODE == Spout:
+        emitSpout(*args, **kwargs)
+
+def emitBolt(tup, stream=None, anchors = [], directTask=None):
+    global ANCHOR_TUPLE
+    if ANCHOR_TUPLE is not None:
+        anchors = [ANCHOR_TUPLE]
+    m = {"command": "emit"}
+    if stream is not None:
+        m["stream"] = stream
+    m["anchors"] = map(lambda a: a.id, anchors)
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def emitSpout(tup, stream=None, id=None, directTask=None):
+    m = {"command": "emit"}
+    if id is not None:
+        m["id"] = id
+    if stream is not None:
+        m["stream"] = stream
+    if directTask is not None:
+        m["task"] = directTask
+    m["tuple"] = tup
+    sendMsgToParent(m)
+
+def ack(tup):
+    sendMsgToParent({"command": "ack", "id": tup.id})
+
+def fail(tup):
+    sendMsgToParent({"command": "fail", "id": tup.id})
+
+def reportError(msg):
+    sendMsgToParent({"command": "error", "msg": msg})
+
+def log(msg, level=2):
+    sendMsgToParent({"command": "log", "msg": msg, "level":level})
+
+def logTrace(msg):
+    log(msg, 0)
+
+def logDebug(msg):
+    log(msg, 1)
+
+def logInfo(msg):
+    log(msg, 2)
+
+def logWarn(msg):
+    log(msg, 3)
+
+def logError(msg):
+    log(msg, 4)
+
+def rpcMetrics(name, params):
+    sendMsgToParent({"command": "metrics", "name": name, "params": params})
+
+def initComponent():
+    setupInfo = readMsg()
+    sendpid(setupInfo['pidDir'])
+    return [setupInfo['conf'], setupInfo['context']]
+
+class Tuple(object):
+    def __init__(self, id, component, stream, task, values):
+        self.id = id
+        self.component = component
+        self.stream = stream
+        self.task = task
+        self.values = values
+
+    def __repr__(self):
+        return '<%s%s>' % (
+                self.__class__.__name__,
+                ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys())))
+
+class Bolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                self.process(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class BasicBolt(object):
+    def initialize(self, stormconf, context):
+        pass
+
+    def process(self, tuple):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Bolt
+        global ANCHOR_TUPLE
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                tup = readTuple()
+                ANCHOR_TUPLE = tup
+                self.process(tup)
+                ack(tup)
+        except Exception, e:
+            reportError(traceback.format_exc(e))
+
+class Spout(object):
+    def initialize(self, conf, context):
+        pass
+
+    def ack(self, id):
+        pass
+
+    def fail(self, id):
+        pass
+
+    def nextTuple(self):
+        pass
+
+    def run(self):
+        global MODE
+        MODE = Spout
+        conf, context = initComponent()
+        try:
+            self.initialize(conf, context)
+            while True:
+                msg = readCommand()
+                if msg["command"] == "next":
+                    self.nextTuple()
+                if msg["command"] == "ack":
+                    self.ack(msg["id"])
+                if msg["command"] == "fail":
+                    self.fail(msg["id"])
+                sync()
+        except Exception, e:
+            reportError(traceback.format_exc(e))

http://git-wip-us.apache.org/repos/asf/storm/blob/fc641a2d/storm-core/src/dev/resources/storm.rb
----------------------------------------------------------------------
diff --git a/storm-core/src/dev/resources/storm.rb b/storm-core/src/dev/resources/storm.rb
new file mode 100644
index 0000000..17232d1
--- /dev/null
+++ b/storm-core/src/dev/resources/storm.rb
@@ -0,0 +1,227 @@
+# -*- coding: utf-8 -*-
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+require "rubygems"
+require "json"
+
+module Storm
+  module Protocol
+    class << self
+      attr_accessor :mode, :pending_taskids, :pending_commands
+    end
+
+    self.pending_taskids = []
+    self.pending_commands = []
+
+    def read_message
+      msg = ""
+      loop do
+        line = STDIN.readline.chomp
+        break if line == "end"
+        msg << line
+        msg << "\n"
+      end
+      JSON.parse msg.chomp
+    end
+
+    def read_task_ids
+      Storm::Protocol.pending_taskids.shift ||
+        begin
+          msg = read_message
+          until msg.is_a? Array
+            Storm::Protocol.pending_commands.push(msg)
+            msg = read_message
+          end
+          msg
+        end
+    end
+
+    def read_command
+      Storm::Protocol.pending_commands.shift ||
+        begin
+          msg = read_message
+          while msg.is_a? Array
+            Storm::Protocol.pending_taskids.push(msg)
+            msg = read_message
+          end
+          msg
+        end
+    end
+
+    def send_msg_to_parent(msg)
+      puts msg.to_json
+      puts "end"
+      STDOUT.flush
+    end
+
+    def sync
+      send_msg_to_parent({'command' => 'sync'})
+    end
+
+    def send_pid(heartbeat_dir)
+      pid = Process.pid
+      send_msg_to_parent({'pid' => pid})
+      File.open("#{heartbeat_dir}/#{pid}", "w").close
+    end
+
+    def emit_bolt(tup, args = {})
+      stream = args[:stream]
+      anchors = args[:anchors] || args[:anchor] || []
+      anchors = [anchors] unless anchors.is_a? Enumerable
+      direct = args[:direct_task]
+      m = {:command => :emit, :anchors => anchors.map(&:id), :tuple => tup}
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit_spout(tup, args = {})
+      stream = args[:stream]
+      id = args[:id]
+      direct = args[:direct_task]
+      m = {:command => :emit, :tuple => tup}
+      m[:id] = id if id
+      m[:stream] = stream if stream
+      m[:task] = direct if direct
+      send_msg_to_parent m
+      read_task_ids unless direct
+    end
+
+    def emit(*args)
+      case Storm::Protocol.mode
+      when 'spout'
+        emit_spout(*args)
+      when 'bolt'
+        emit_bolt(*args)
+      end
+    end
+
+    def ack(tup)
+      send_msg_to_parent :command => :ack, :id => tup.id
+    end
+
+    def fail(tup)
+      send_msg_to_parent :command => :fail, :id => tup.id
+    end
+
+    def reportError(msg)
+      send_msg_to_parent :command => :error, :msg => msg.to_s
+    end
+
+    def log(msg, level=2)
+      send_msg_to_parent :command => :log, :msg => msg.to_s, :level => level
+    end
+
+    def logTrace(msg)
+      log(msg, 0)
+    end
+
+    def logDebug(msg)
+      log(msg, 1)
+    end
+
+    def logInfo(msg)
+      log(msg, 2)
+    end
+
+    def logWarn(msg)
+      log(msg, 3)
+    end
+
+    def logError(msg)
+      log(msg, 4)
+    end
+
+    def handshake
+      setup_info = read_message
+      send_pid setup_info['pidDir']
+      [setup_info['conf'], setup_info['context']]
+    end
+  end
+
+  class Tuple
+    attr_accessor :id, :component, :stream, :task, :values
+
+    def initialize(id, component, stream, task, values)
+      @id = id
+      @component = component
+      @stream = stream
+      @task = task
+      @values = values
+    end
+
+    def self.from_hash(hash)
+      Tuple.new(*hash.values_at("id", "comp", "stream", "task", "tuple"))
+    end
+  end
+
+  class Bolt
+    include Storm::Protocol
+
+    def prepare(conf, context); end
+
+    def process(tuple); end
+
+    def run
+      Storm::Protocol.mode = 'bolt'
+      prepare(*handshake)
+      begin
+        while true
+          process Tuple.from_hash(read_command)
+        end
+      rescue Exception => e
+        reportError 'Exception in bolt: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+
+  class Spout
+    include Storm::Protocol
+
+    def open(conf, context); end
+
+    def nextTuple; end
+
+    def ack(id); end
+
+    def fail(id); end
+
+    def run
+      Storm::Protocol.mode = 'spout'
+      open(*handshake)
+
+      begin
+        while true
+          msg = read_command
+          case msg['command']
+          when 'next'
+            nextTuple
+          when 'ack'
+            ack(msg['id'])
+          when 'fail'
+            fail(msg['id'])
+          end
+          sync
+        end
+      rescue Exception => e
+        reportError 'Exception in spout: ' + e.message + ' - ' + e.backtrace.join('\n')
+      end
+    end
+  end
+end