You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/03/18 20:39:12 UTC

[05/16] storm git commit: Merge remote-tracking branch 'upstream/master' into ru

Merge remote-tracking branch 'upstream/master' into ru

Conflicts:
	storm-core/src/clj/backtype/storm/cluster.clj
	storm-core/src/py/storm/ttypes.py
	storm-core/src/storm.thrift


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0571e22c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0571e22c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0571e22c

Branch: refs/heads/master
Commit: 0571e22c15a1214c3e08510fddad608571f6a6d8
Parents: 6390064 8036109
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Feb 9 12:47:36 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Feb 9 12:47:36 2015 -0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |   9 +
 README.markdown                                 |   8 +-
 STORM-UI-REST-API.md                            |  21 +
 dev-tools/github/__init__.py                    |  11 +
 dev-tools/jira-github-join.py                   |   4 +-
 dev-tools/storm-merge.py                        |  31 +
 docs/README.md                                  |   9 +
 docs/documentation/Common-patterns.md           |  14 +-
 docs/documentation/Concepts.md                  |  13 +-
 docs/documentation/Home.md                      |   2 +-
 docs/documentation/Multilang-protocol.md        |   4 +-
 docs/documentation/Powered-By.md                |   4 +-
 .../storm/starter/SkewedRollingTopWords.java    | 134 +++
 .../storm/starter/bolt/RollingCountAggBolt.java |  78 ++
 pom.xml                                         |  11 +
 storm-core/src/clj/backtype/storm/cluster.clj   |  43 +-
 .../src/clj/backtype/storm/daemon/logviewer.clj |   5 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  35 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |   6 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  34 +-
 .../coordination/BatchSubtopologyBuilder.java   |  11 +
 .../storm/drpc/LinearDRPCInputDeclarer.java     |   5 +-
 .../storm/drpc/LinearDRPCTopologyBuilder.java   |  13 +-
 .../storm/generated/GetInfoOptions.java         | 350 +++++++
 .../jvm/backtype/storm/generated/Nimbus.java    | 974 +++++++++++++++++++
 .../storm/generated/NumErrorsChoice.java        |  64 ++
 .../storm/grouping/PartialKeyGrouping.java      |  31 +-
 .../backtype/storm/topology/InputDeclarer.java  |   3 +
 .../storm/topology/TopologyBuilder.java         |  11 +
 .../TransactionalTopologyBuilder.java           |  13 +-
 .../src/jvm/backtype/storm/utils/Monitor.java   |   8 +-
 .../topology/TridentTopologyBuilder.java        |  13 +-
 storm-core/src/py/storm/DistributedRPC-remote   |   0
 .../py/storm/DistributedRPCInvocations-remote   |   0
 storm-core/src/py/storm/Nimbus-remote           |   7 +
 storm-core/src/py/storm/Nimbus.py               | 226 +++++
 storm-core/src/py/storm/ttypes.py               |  80 ++
 storm-core/src/storm.thrift                     |  10 +
 .../clj/backtype/storm/integration_test.clj     |  10 +-
 .../storm/grouping/PartialKeyGroupingTest.java  |  26 +-
 40 files changed, 2272 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 15bf8a3,4b73f2e..1d5026f
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -230,11 -239,11 +241,11 @@@
        (cb id))))
  
  (defn- maybe-deserialize
 -  [ser]
 +  [ser clazz]
    (when ser
 -    (Utils/deserialize ser)))
 +    (Utils/deserialize ser clazz)))
  
- (defstruct TaskError :error :time-secs :host :port)
+ (defrecord TaskError [error time-secs host port])
  
  (defn- parse-error-path
    [^String p]
@@@ -440,9 -441,13 +451,13 @@@
        (report-error
           [this storm-id component-id node port error]
           (let [path (error-path storm-id component-id)
+                last-error-path (last-error-path storm-id component-id)
 -               data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
 +               data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
                 _ (mkdirs cluster-state path acls)
-                _ (create-sequential cluster-state (str path "/e") (Utils/serialize data) acls)
+                ser-data (Utils/serialize data)
+                _ (mkdirs cluster-state path acls)
+                _ (create-sequential cluster-state (str path "/e") ser-data acls)
+                _ (set-data cluster-state last-error-path ser-data acls)
                 to-kill (->> (get-children cluster-state path false)
                              (sort-by parse-error-path)
                              reverse
@@@ -455,16 -460,22 +470,24 @@@
           (let [path (error-path storm-id component-id)
                 errors (if (exists-node? cluster-state path false)
                          (dofor [c (get-children cluster-state path false)]
-                           (let [data (-> (get-data cluster-state (str path "/" c) false)
-                                        (maybe-deserialize ErrorInfo)
-                                        clojurify-error)]
-                             (when data
-                               (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
-                               )))
-                         ())
-                ]
+                           (if-let [data (-> (get-data cluster-state
+                                                       (str path "/" c)
+                                                       false)
 -                                         maybe-deserialize)]
++                                          (maybe-deserialize ErrorInfo)
++                                          clojurify-error)]
+                             (map->TaskError data)))
+                         ())]
             (->> (filter not-nil? errors)
                  (sort-by (comp - :time-secs)))))
+ 
+       (last-error
+         [this storm-id component-id]
+         (let [path (last-error-path storm-id component-id)]
+           (if (exists-node? cluster-state path false)
 -            (if-let [data (->> (get-data cluster-state path false)
 -                               maybe-deserialize)]
++            (if-let [data (-> (get-data cluster-state path false)
++                              (maybe-deserialize ErrorInfo)
++                              clojurify-error)]
+               (map->TaskError data)))))
        
        (disconnect
           [this]

http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --cc storm-core/src/py/storm/ttypes.py
index e15cf1d,46e7a92..112daaa
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@@ -44,26 -44,23 +44,43 @@@ class TopologyInitialStatus
      "INACTIVE": 2,
    }
  
 +class TopologyStatus:
 +  ACTIVE = 1
 +  INACTIVE = 2
 +  REBALANCING = 3
 +  KILLED = 4
 +
 +  _VALUES_TO_NAMES = {
 +    1: "ACTIVE",
 +    2: "INACTIVE",
 +    3: "REBALANCING",
 +    4: "KILLED",
 +  }
 +
 +  _NAMES_TO_VALUES = {
 +    "ACTIVE": 1,
 +    "INACTIVE": 2,
 +    "REBALANCING": 3,
 +    "KILLED": 4,
 +  }
 +
+ class NumErrorsChoice:
+   ALL = 0
+   NONE = 1
+   ONE = 2
+ 
+   _VALUES_TO_NAMES = {
+     0: "ALL",
+     1: "NONE",
+     2: "ONE",
+   }
+ 
+   _NAMES_TO_VALUES = {
+     "ALL": 0,
+     "NONE": 1,
+     "ONE": 2,
+   }
+ 
  
  class JavaObjectArg:
    """
@@@ -4417,764 -4400,69 +4434,827 @@@ class SubmitOptions
    def __ne__(self, other):
      return not (self == other)
  
 +class SupervisorInfo:
 +  """
 +  Attributes:
 +   - time_secs
 +   - hostname
 +   - assignment_id
 +   - used_ports
 +   - meta
 +   - scheduler_meta
 +   - uptime_secs
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.I64, 'time_secs', None, None, ), # 1
 +    (2, TType.STRING, 'hostname', None, None, ), # 2
 +    (3, TType.STRING, 'assignment_id', None, None, ), # 3
 +    (4, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 4
 +    (5, TType.LIST, 'meta', (TType.I64,None), None, ), # 5
 +    (6, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 6
 +    (7, TType.I64, 'uptime_secs', None, None, ), # 7
 +  )
 +
 +  def __hash__(self):
 +    return 0 + hash(self.time_secs) + hash(self.hostname) + hash(self.assignment_id) + hash(self.used_ports) + hash(self.meta) + hash(self.scheduler_meta) + hash(self.uptime_secs)
 +
 +  def __init__(self, time_secs=None, hostname=None, assignment_id=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None,):
 +    self.time_secs = time_secs
 +    self.hostname = hostname
 +    self.assignment_id = assignment_id
 +    self.used_ports = used_ports
 +    self.meta = meta
 +    self.scheduler_meta = scheduler_meta
 +    self.uptime_secs = uptime_secs
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.I64:
 +          self.time_secs = iprot.readI64();
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.STRING:
 +          self.hostname = iprot.readString().decode('utf-8')
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 3:
 +        if ftype == TType.STRING:
 +          self.assignment_id = iprot.readString().decode('utf-8')
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 4:
 +        if ftype == TType.LIST:
 +          self.used_ports = []
 +          (_etype304, _size301) = iprot.readListBegin()
 +          for _i305 in xrange(_size301):
 +            _elem306 = iprot.readI64();
 +            self.used_ports.append(_elem306)
 +          iprot.readListEnd()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 5:
 +        if ftype == TType.LIST:
 +          self.meta = []
 +          (_etype310, _size307) = iprot.readListBegin()
 +          for _i311 in xrange(_size307):
 +            _elem312 = iprot.readI64();
 +            self.meta.append(_elem312)
 +          iprot.readListEnd()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 6:
 +        if ftype == TType.MAP:
 +          self.scheduler_meta = {}
 +          (_ktype314, _vtype315, _size313 ) = iprot.readMapBegin() 
 +          for _i317 in xrange(_size313):
 +            _key318 = iprot.readString().decode('utf-8')
 +            _val319 = iprot.readString().decode('utf-8')
 +            self.scheduler_meta[_key318] = _val319
 +          iprot.readMapEnd()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 7:
 +        if ftype == TType.I64:
 +          self.uptime_secs = iprot.readI64();
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('SupervisorInfo')
 +    if self.time_secs is not None:
 +      oprot.writeFieldBegin('time_secs', TType.I64, 1)
 +      oprot.writeI64(self.time_secs)
 +      oprot.writeFieldEnd()
 +    if self.hostname is not None:
 +      oprot.writeFieldBegin('hostname', TType.STRING, 2)
 +      oprot.writeString(self.hostname.encode('utf-8'))
 +      oprot.writeFieldEnd()
 +    if self.assignment_id is not None:
 +      oprot.writeFieldBegin('assignment_id', TType.STRING, 3)
 +      oprot.writeString(self.assignment_id.encode('utf-8'))
 +      oprot.writeFieldEnd()
 +    if self.used_ports is not None:
 +      oprot.writeFieldBegin('used_ports', TType.LIST, 4)
 +      oprot.writeListBegin(TType.I64, len(self.used_ports))
 +      for iter320 in self.used_ports:
 +        oprot.writeI64(iter320)
 +      oprot.writeListEnd()
 +      oprot.writeFieldEnd()
 +    if self.meta is not None:
 +      oprot.writeFieldBegin('meta', TType.LIST, 5)
 +      oprot.writeListBegin(TType.I64, len(self.meta))
 +      for iter321 in self.meta:
 +        oprot.writeI64(iter321)
 +      oprot.writeListEnd()
 +      oprot.writeFieldEnd()
 +    if self.scheduler_meta is not None:
 +      oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6)
 +      oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta))
 +      for kiter322,viter323 in self.scheduler_meta.items():
 +        oprot.writeString(kiter322.encode('utf-8'))
 +        oprot.writeString(viter323.encode('utf-8'))
 +      oprot.writeMapEnd()
 +      oprot.writeFieldEnd()
 +    if self.uptime_secs is not None:
 +      oprot.writeFieldBegin('uptime_secs', TType.I64, 7)
 +      oprot.writeI64(self.uptime_secs)
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    if self.time_secs is None:
 +      raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
 +    if self.hostname is None:
 +      raise TProtocol.TProtocolException(message='Required field hostname is unset!')
 +    return
 +
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
 +class NodeInfo:
 +  """
 +  Attributes:
 +   - node
 +   - port
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.STRING, 'node', None, None, ), # 1
 +    (2, TType.SET, 'port', (TType.I64,None), None, ), # 2
 +  )
 +
 +  def __hash__(self):
 +    return 0 + hash(self.node) + hash(self.port)
 +
 +  def __init__(self, node=None, port=None,):
 +    self.node = node
 +    self.port = port
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.STRING:
 +          self.node = iprot.readString().decode('utf-8')
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.SET:
 +          self.port = set()
 +          (_etype327, _size324) = iprot.readSetBegin()
 +          for _i328 in xrange(_size324):
 +            _elem329 = iprot.readI64();
 +            self.port.add(_elem329)
 +          iprot.readSetEnd()
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('NodeInfo')
 +    if self.node is not None:
 +      oprot.writeFieldBegin('node', TType.STRING, 1)
 +      oprot.writeString(self.node.encode('utf-8'))
 +      oprot.writeFieldEnd()
 +    if self.port is not None:
 +      oprot.writeFieldBegin('port', TType.SET, 2)
 +      oprot.writeSetBegin(TType.I64, len(self.port))
 +      for iter330 in self.port:
 +        oprot.writeI64(iter330)
 +      oprot.writeSetEnd()
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    if self.node is None:
 +      raise TProtocol.TProtocolException(message='Required field node is unset!')
 +    if self.port is None:
 +      raise TProtocol.TProtocolException(message='Required field port is unset!')
 +    return
 +
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
 +class Assignment:
 +  """
 +  Attributes:
 +   - master_code_dir
 +   - node_host
 +   - executor_node_port
 +   - executor_start_time_secs
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.STRING, 'master_code_dir', None, None, ), # 1
 +    (2, TType.MAP, 'node_host', (TType.STRING,None,TType.STRING,None), {
 +    }, ), # 2
 +    (3, TType.MAP, 'executor_node_port', (TType.LIST,(TType.I64,None),TType.STRUCT,(NodeInfo, NodeInfo.thrift_spec)), {
 +    }, ), # 3
 +    (4, TType.MAP, 'executor_start_time_secs', (TType.LIST,(TType.I64,None),TType.I64,None), {
 +    }, ), # 4
 +  )
 +
 +  def __hash__(self):
 +    return 0 + hash(self.master_code_dir) + hash(self.node_host) + hash(self.executor_node_port) + hash(self.executor_start_time_secs)
 +
 +  def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4],):
 +    self.master_code_dir = master_code_dir
 +    if node_host is self.thrift_spec[2][4]:
 +      node_host = {
 +    }
 +    self.node_host = node_host
 +    if executor_node_port is self.thrift_spec[3][4]:
 +      executor_node_port = {
 +    }
 +    self.executor_node_port = executor_node_port
 +    if executor_start_time_secs is self.thrift_spec[4][4]:
 +      executor_start_time_secs = {
 +    }
 +    self.executor_start_time_secs = executor_start_time_secs
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.STRING:
 +          self.master_code_dir = iprot.readString().decode('utf-8')
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.MAP:
 +          self.node_host = {}
 +          (_ktype332, _vtype333, _size331 ) = iprot.readMapBegin() 
 +          for _i335 in xrange(_size331):
 +            _key336 = iprot.readString().decode('utf-8')
 +            _val337 = iprot.readString().decode('utf-8')
 +            self.node_host[_key336] = _val337
 +          iprot.readMapEnd()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 3:
 +        if ftype == TType.MAP:
 +          self.executor_node_port = {}
 +          (_ktype339, _vtype340, _size338 ) = iprot.readMapBegin() 
 +          for _i342 in xrange(_size338):
 +            _key343 = []
 +            (_etype348, _size345) = iprot.readListBegin()
 +            for _i349 in xrange(_size345):
 +              _elem350 = iprot.readI64();
 +              _key343.append(_elem350)
 +            iprot.readListEnd()
 +            _val344 = NodeInfo()
 +            _val344.read(iprot)
 +            self.executor_node_port[_key343] = _val344
 +          iprot.readMapEnd()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 4:
 +        if ftype == TType.MAP:
 +          self.executor_start_time_secs = {}
 +          (_ktype352, _vtype353, _size351 ) = iprot.readMapBegin() 
 +          for _i355 in xrange(_size351):
 +            _key356 = []
 +            (_etype361, _size358) = iprot.readListBegin()
 +            for _i362 in xrange(_size358):
 +              _elem363 = iprot.readI64();
 +              _key356.append(_elem363)
 +            iprot.readListEnd()
 +            _val357 = iprot.readI64();
 +            self.executor_start_time_secs[_key356] = _val357
 +          iprot.readMapEnd()
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('Assignment')
 +    if self.master_code_dir is not None:
 +      oprot.writeFieldBegin('master_code_dir', TType.STRING, 1)
 +      oprot.writeString(self.master_code_dir.encode('utf-8'))
 +      oprot.writeFieldEnd()
 +    if self.node_host is not None:
 +      oprot.writeFieldBegin('node_host', TType.MAP, 2)
 +      oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host))
 +      for kiter364,viter365 in self.node_host.items():
 +        oprot.writeString(kiter364.encode('utf-8'))
 +        oprot.writeString(viter365.encode('utf-8'))
 +      oprot.writeMapEnd()
 +      oprot.writeFieldEnd()
 +    if self.executor_node_port is not None:
 +      oprot.writeFieldBegin('executor_node_port', TType.MAP, 3)
 +      oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port))
 +      for kiter366,viter367 in self.executor_node_port.items():
 +        oprot.writeListBegin(TType.I64, len(kiter366))
 +        for iter368 in kiter366:
 +          oprot.writeI64(iter368)
 +        oprot.writeListEnd()
 +        viter367.write(oprot)
 +      oprot.writeMapEnd()
 +      oprot.writeFieldEnd()
 +    if self.executor_start_time_secs is not None:
 +      oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4)
 +      oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs))
 +      for kiter369,viter370 in self.executor_start_time_secs.items():
 +        oprot.writeListBegin(TType.I64, len(kiter369))
 +        for iter371 in kiter369:
 +          oprot.writeI64(iter371)
 +        oprot.writeListEnd()
 +        oprot.writeI64(viter370)
 +      oprot.writeMapEnd()
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    if self.master_code_dir is None:
 +      raise TProtocol.TProtocolException(message='Required field master_code_dir is unset!')
 +    return
 +
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
 +class TopologyActionOptions:
 +  """
 +  Attributes:
 +   - kill_options
 +   - rebalance_options
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.STRUCT, 'kill_options', (KillOptions, KillOptions.thrift_spec), None, ), # 1
 +    (2, TType.STRUCT, 'rebalance_options', (RebalanceOptions, RebalanceOptions.thrift_spec), None, ), # 2
 +  )
 +
 +  def __hash__(self):
 +    return 0 + hash(self.kill_options) + hash(self.rebalance_options)
 +
 +  def __init__(self, kill_options=None, rebalance_options=None,):
 +    self.kill_options = kill_options
 +    self.rebalance_options = rebalance_options
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.STRUCT:
 +          self.kill_options = KillOptions()
 +          self.kill_options.read(iprot)
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.STRUCT:
 +          self.rebalance_options = RebalanceOptions()
 +          self.rebalance_options.read(iprot)
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('TopologyActionOptions')
 +    if self.kill_options is not None:
 +      oprot.writeFieldBegin('kill_options', TType.STRUCT, 1)
 +      self.kill_options.write(oprot)
 +      oprot.writeFieldEnd()
 +    if self.rebalance_options is not None:
 +      oprot.writeFieldBegin('rebalance_options', TType.STRUCT, 2)
 +      self.rebalance_options.write(oprot)
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    return
 +
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
 +class StormBase:
 +  """
 +  Attributes:
 +   - name
 +   - status
 +   - num_workers
 +   - component_executors
 +   - launch_time_secs
 +   - owner
 +   - topology_action_options
 +   - prev_status
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.STRING, 'name', None, None, ), # 1
 +    (2, TType.I32, 'status', None, None, ), # 2
 +    (3, TType.I32, 'num_workers', None, None, ), # 3
 +    (4, TType.MAP, 'component_executors', (TType.STRING,None,TType.I32,None), None, ), # 4
 +    (5, TType.I32, 'launch_time_secs', None, None, ), # 5
 +    (6, TType.STRING, 'owner', None, None, ), # 6
 +    (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec), None, ), # 7
 +    (8, TType.I32, 'prev_status', None, None, ), # 8
 +  )
 +
 +  def __hash__(self):
 +    return 0 + hash(self.name) + hash(self.status) + hash(self.num_workers) + hash(self.component_executors) + hash(self.launch_time_secs) + hash(self.owner) + hash(self.topology_action_options) + hash(self.prev_status)
 +
 +  def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None,):
 +    self.name = name
 +    self.status = status
 +    self.num_workers = num_workers
 +    self.component_executors = component_executors
 +    self.launch_time_secs = launch_time_secs
 +    self.owner = owner
 +    self.topology_action_options = topology_action_options
 +    self.prev_status = prev_status
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.STRING:
 +          self.name = iprot.readString().decode('utf-8')
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.I32:
 +          self.status = iprot.readI32();
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 3:
 +        if ftype == TType.I32:
 +          self.num_workers = iprot.readI32();
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 4:
 +        if ftype == TType.MAP:
 +          self.component_executors = {}
 +          (_ktype373, _vtype374, _size372 ) = iprot.readMapBegin() 
 +          for _i376 in xrange(_size372):
 +            _key377 = iprot.readString().decode('utf-8')
 +            _val378 = iprot.readI32();
 +            self.component_executors[_key377] = _val378
 +          iprot.readMapEnd()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 5:
 +        if ftype == TType.I32:
 +          self.launch_time_secs = iprot.readI32();
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 6:
 +        if ftype == TType.STRING:
 +          self.owner = iprot.readString().decode('utf-8')
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 7:
 +        if ftype == TType.STRUCT:
 +          self.topology_action_options = TopologyActionOptions()
 +          self.topology_action_options.read(iprot)
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 8:
 +        if ftype == TType.I32:
 +          self.prev_status = iprot.readI32();
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('StormBase')
 +    if self.name is not None:
 +      oprot.writeFieldBegin('name', TType.STRING, 1)
 +      oprot.writeString(self.name.encode('utf-8'))
 +      oprot.writeFieldEnd()
 +    if self.status is not None:
 +      oprot.writeFieldBegin('status', TType.I32, 2)
 +      oprot.writeI32(self.status)
 +      oprot.writeFieldEnd()
 +    if self.num_workers is not None:
 +      oprot.writeFieldBegin('num_workers', TType.I32, 3)
 +      oprot.writeI32(self.num_workers)
 +      oprot.writeFieldEnd()
 +    if self.component_executors is not None:
 +      oprot.writeFieldBegin('component_executors', TType.MAP, 4)
 +      oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors))
 +      for kiter379,viter380 in self.component_executors.items():
 +        oprot.writeString(kiter379.encode('utf-8'))
 +        oprot.writeI32(viter380)
 +      oprot.writeMapEnd()
 +      oprot.writeFieldEnd()
 +    if self.launch_time_secs is not None:
 +      oprot.writeFieldBegin('launch_time_secs', TType.I32, 5)
 +      oprot.writeI32(self.launch_time_secs)
 +      oprot.writeFieldEnd()
 +    if self.owner is not None:
 +      oprot.writeFieldBegin('owner', TType.STRING, 6)
 +      oprot.writeString(self.owner.encode('utf-8'))
 +      oprot.writeFieldEnd()
 +    if self.topology_action_options is not None:
 +      oprot.writeFieldBegin('topology_action_options', TType.STRUCT, 7)
 +      self.topology_action_options.write(oprot)
 +      oprot.writeFieldEnd()
 +    if self.prev_status is not None:
 +      oprot.writeFieldBegin('prev_status', TType.I32, 8)
 +      oprot.writeI32(self.prev_status)
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    if self.name is None:
 +      raise TProtocol.TProtocolException(message='Required field name is unset!')
 +    if self.status is None:
 +      raise TProtocol.TProtocolException(message='Required field status is unset!')
 +    if self.num_workers is None:
 +      raise TProtocol.TProtocolException(message='Required field num_workers is unset!')
 +    return
 +
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
 +class ZKWorkerHeartbeat:
 +  """
 +  Attributes:
 +   - storm_id
 +   - executor_stats
 +   - time_secs
 +  """
 +
 +  thrift_spec = (
 +    None, # 0
 +    (1, TType.STRING, 'storm_id', None, None, ), # 1
 +    (2, TType.MAP, 'executor_stats', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec),TType.STRUCT,(ExecutorStats, ExecutorStats.thrift_spec)), None, ), # 2
 +    (3, TType.I32, 'time_secs', None, None, ), # 3
 +  )
 +
 +  def __hash__(self):
 +    return 0 + hash(self.storm_id) + hash(self.executor_stats) + hash(self.time_secs)
 +
 +  def __init__(self, storm_id=None, executor_stats=None, time_secs=None,):
 +    self.storm_id = storm_id
 +    self.executor_stats = executor_stats
 +    self.time_secs = time_secs
 +
 +  def read(self, iprot):
 +    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
 +      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
 +      return
 +    iprot.readStructBegin()
 +    while True:
 +      (fname, ftype, fid) = iprot.readFieldBegin()
 +      if ftype == TType.STOP:
 +        break
 +      if fid == 1:
 +        if ftype == TType.STRING:
 +          self.storm_id = iprot.readString().decode('utf-8')
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 2:
 +        if ftype == TType.MAP:
 +          self.executor_stats = {}
 +          (_ktype382, _vtype383, _size381 ) = iprot.readMapBegin() 
 +          for _i385 in xrange(_size381):
 +            _key386 = ExecutorInfo()
 +            _key386.read(iprot)
 +            _val387 = ExecutorStats()
 +            _val387.read(iprot)
 +            self.executor_stats[_key386] = _val387
 +          iprot.readMapEnd()
 +        else:
 +          iprot.skip(ftype)
 +      elif fid == 3:
 +        if ftype == TType.I32:
 +          self.time_secs = iprot.readI32();
 +        else:
 +          iprot.skip(ftype)
 +      else:
 +        iprot.skip(ftype)
 +      iprot.readFieldEnd()
 +    iprot.readStructEnd()
 +
 +  def write(self, oprot):
 +    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
 +      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
 +      return
 +    oprot.writeStructBegin('ZKWorkerHeartbeat')
 +    if self.storm_id is not None:
 +      oprot.writeFieldBegin('storm_id', TType.STRING, 1)
 +      oprot.writeString(self.storm_id.encode('utf-8'))
 +      oprot.writeFieldEnd()
 +    if self.executor_stats is not None:
 +      oprot.writeFieldBegin('executor_stats', TType.MAP, 2)
 +      oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats))
 +      for kiter388,viter389 in self.executor_stats.items():
 +        kiter388.write(oprot)
 +        viter389.write(oprot)
 +      oprot.writeMapEnd()
 +      oprot.writeFieldEnd()
 +    if self.time_secs is not None:
 +      oprot.writeFieldBegin('time_secs', TType.I32, 3)
 +      oprot.writeI32(self.time_secs)
 +      oprot.writeFieldEnd()
 +    oprot.writeFieldStop()
 +    oprot.writeStructEnd()
 +
 +  def validate(self):
 +    if self.storm_id is None:
 +      raise TProtocol.TProtocolException(message='Required field storm_id is unset!')
 +    if self.executor_stats is None:
 +      raise TProtocol.TProtocolException(message='Required field executor_stats is unset!')
 +    if self.time_secs is None:
 +      raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
 +    return
 +
 +
 +  def __repr__(self):
 +    L = ['%s=%r' % (key, value)
 +      for key, value in self.__dict__.iteritems()]
 +    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
 +
 +  def __eq__(self, other):
 +    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 +
 +  def __ne__(self, other):
 +    return not (self == other)
 +
+ class GetInfoOptions:
+   """
+   Attributes:
+    - num_err_choice
+   """
+ 
+   thrift_spec = (
+     None, # 0
+     (1, TType.I32, 'num_err_choice', None, None, ), # 1
+   )
+ 
+   def __hash__(self):
+     return 0 + hash(self.num_err_choice)
+ 
+   def __init__(self, num_err_choice=None,):
+     self.num_err_choice = num_err_choice
+ 
+   def read(self, iprot):
+     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+       fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+       return
+     iprot.readStructBegin()
+     while True:
+       (fname, ftype, fid) = iprot.readFieldBegin()
+       if ftype == TType.STOP:
+         break
+       if fid == 1:
+         if ftype == TType.I32:
+           self.num_err_choice = iprot.readI32();
+         else:
+           iprot.skip(ftype)
+       else:
+         iprot.skip(ftype)
+       iprot.readFieldEnd()
+     iprot.readStructEnd()
+ 
+   def write(self, oprot):
+     if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+       oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+       return
+     oprot.writeStructBegin('GetInfoOptions')
+     if self.num_err_choice is not None:
+       oprot.writeFieldBegin('num_err_choice', TType.I32, 1)
+       oprot.writeI32(self.num_err_choice)
+       oprot.writeFieldEnd()
+     oprot.writeFieldStop()
+     oprot.writeStructEnd()
+ 
+   def validate(self):
+     return
+ 
+ 
+   def __repr__(self):
+     L = ['%s=%r' % (key, value)
+       for key, value in self.__dict__.iteritems()]
+     return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+ 
+   def __eq__(self, other):
+     return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+ 
+   def __ne__(self, other):
+     return not (self == other)
+ 
  class DRPCRequest:
    """
    Attributes:

http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --cc storm-core/src/storm.thrift
index 3cc0eb9,066cb4f..04b6a1b
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@@ -244,55 -243,15 +244,64 @@@ struct SubmitOptions 
    2: optional Credentials creds;
  }
  
 +struct SupervisorInfo {
 +    1: required i64 time_secs;
 +    2: required string hostname;
 +    3: optional string assignment_id;
 +    4: optional list<i64> used_ports;
 +    5: optional list<i64> meta;
 +    6: optional map<string, string> scheduler_meta;
 +    7: optional i64 uptime_secs;
 +}
 +struct NodeInfo {
 +    1: required string node;
 +    2: required set<i64> port;
 +}
 +
 +struct Assignment {
 +    1: required string master_code_dir;
 +    2: optional map<string, string> node_host = {};
 +    3: optional map<list<i64>, NodeInfo> executor_node_port = {};
 +    4: optional map<list<i64>, i64> executor_start_time_secs = {};
 +}
 +
 +enum TopologyStatus {
 +    ACTIVE = 1,
 +    INACTIVE = 2,
 +    REBALANCING = 3,
 +    KILLED = 4
 +}
 +
 +union TopologyActionOptions {
 +    1: optional KillOptions kill_options;
 +    2: optional RebalanceOptions rebalance_options;
 +}
 +
 +struct StormBase {
 +    1: required string name;
 +    2: required TopologyStatus status;
 +    3: required i32 num_workers;
 +    4: optional map<string, i32> component_executors;
 +    5: optional i32 launch_time_secs;
 +    6: optional string owner;
 +    7: optional TopologyActionOptions topology_action_options;
 +    8: optional TopologyStatus prev_status;//currently only used during rebalance action.
 +}
 +
 +struct ZKWorkerHeartbeat {
 +    1: required string storm_id;
 +    2: required map<ExecutorInfo,ExecutorStats> executor_stats;
 +    3: required i32 time_secs;
 +}
+ enum NumErrorsChoice {
+   ALL,
+   NONE,
+   ONE
+ }
+ 
+ struct GetInfoOptions {
+   1: optional NumErrorsChoice num_err_choice;
+ }
  
  service Nimbus {
    void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);