You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/03/18 20:39:12 UTC
[05/16] storm git commit: Merge remote-tracking branch
'upstream/master' into ru
Merge remote-tracking branch 'upstream/master' into ru
Conflicts:
storm-core/src/clj/backtype/storm/cluster.clj
storm-core/src/py/storm/ttypes.py
storm-core/src/storm.thrift
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/0571e22c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/0571e22c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/0571e22c
Branch: refs/heads/master
Commit: 0571e22c15a1214c3e08510fddad608571f6a6d8
Parents: 6390064 8036109
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Mon Feb 9 12:47:36 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Mon Feb 9 12:47:36 2015 -0800
----------------------------------------------------------------------
CHANGELOG.md | 9 +
README.markdown | 8 +-
STORM-UI-REST-API.md | 21 +
dev-tools/github/__init__.py | 11 +
dev-tools/jira-github-join.py | 4 +-
dev-tools/storm-merge.py | 31 +
docs/README.md | 9 +
docs/documentation/Common-patterns.md | 14 +-
docs/documentation/Concepts.md | 13 +-
docs/documentation/Home.md | 2 +-
docs/documentation/Multilang-protocol.md | 4 +-
docs/documentation/Powered-By.md | 4 +-
.../storm/starter/SkewedRollingTopWords.java | 134 +++
.../storm/starter/bolt/RollingCountAggBolt.java | 78 ++
pom.xml | 11 +
storm-core/src/clj/backtype/storm/cluster.clj | 43 +-
.../src/clj/backtype/storm/daemon/logviewer.clj | 5 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 35 +-
.../src/clj/backtype/storm/daemon/worker.clj | 6 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 34 +-
.../coordination/BatchSubtopologyBuilder.java | 11 +
.../storm/drpc/LinearDRPCInputDeclarer.java | 5 +-
.../storm/drpc/LinearDRPCTopologyBuilder.java | 13 +-
.../storm/generated/GetInfoOptions.java | 350 +++++++
.../jvm/backtype/storm/generated/Nimbus.java | 974 +++++++++++++++++++
.../storm/generated/NumErrorsChoice.java | 64 ++
.../storm/grouping/PartialKeyGrouping.java | 31 +-
.../backtype/storm/topology/InputDeclarer.java | 3 +
.../storm/topology/TopologyBuilder.java | 11 +
.../TransactionalTopologyBuilder.java | 13 +-
.../src/jvm/backtype/storm/utils/Monitor.java | 8 +-
.../topology/TridentTopologyBuilder.java | 13 +-
storm-core/src/py/storm/DistributedRPC-remote | 0
.../py/storm/DistributedRPCInvocations-remote | 0
storm-core/src/py/storm/Nimbus-remote | 7 +
storm-core/src/py/storm/Nimbus.py | 226 +++++
storm-core/src/py/storm/ttypes.py | 80 ++
storm-core/src/storm.thrift | 10 +
.../clj/backtype/storm/integration_test.clj | 10 +-
.../storm/grouping/PartialKeyGroupingTest.java | 26 +-
40 files changed, 2272 insertions(+), 59 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/backtype/storm/cluster.clj
index 15bf8a3,4b73f2e..1d5026f
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@@ -230,11 -239,11 +241,11 @@@
(cb id))))
(defn- maybe-deserialize
- [ser]
+ [ser clazz]
(when ser
- (Utils/deserialize ser)))
+ (Utils/deserialize ser clazz)))
- (defstruct TaskError :error :time-secs :host :port)
+ (defrecord TaskError [error time-secs host port])
(defn- parse-error-path
[^String p]
@@@ -440,9 -441,13 +451,13 @@@
(report-error
[this storm-id component-id node port error]
(let [path (error-path storm-id component-id)
+ last-error-path (last-error-path storm-id component-id)
- data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
+ data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
_ (mkdirs cluster-state path acls)
- _ (create-sequential cluster-state (str path "/e") (Utils/serialize data) acls)
+ ser-data (Utils/serialize data)
+ _ (mkdirs cluster-state path acls)
+ _ (create-sequential cluster-state (str path "/e") ser-data acls)
+ _ (set-data cluster-state last-error-path ser-data acls)
to-kill (->> (get-children cluster-state path false)
(sort-by parse-error-path)
reverse
@@@ -455,16 -460,22 +470,24 @@@
(let [path (error-path storm-id component-id)
errors (if (exists-node? cluster-state path false)
(dofor [c (get-children cluster-state path false)]
- (let [data (-> (get-data cluster-state (str path "/" c) false)
- (maybe-deserialize ErrorInfo)
- clojurify-error)]
- (when data
- (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
- )))
- ())
- ]
+ (if-let [data (-> (get-data cluster-state
+ (str path "/" c)
+ false)
- maybe-deserialize)]
++ (maybe-deserialize ErrorInfo)
++ clojurify-error)]
+ (map->TaskError data)))
+ ())]
(->> (filter not-nil? errors)
(sort-by (comp - :time-secs)))))
+
+ (last-error
+ [this storm-id component-id]
+ (let [path (last-error-path storm-id component-id)]
+ (if (exists-node? cluster-state path false)
- (if-let [data (->> (get-data cluster-state path false)
- maybe-deserialize)]
++ (if-let [data (-> (get-data cluster-state path false)
++ (maybe-deserialize ErrorInfo)
++ clojurify-error)]
+ (map->TaskError data)))))
(disconnect
[this]
http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --cc storm-core/src/py/storm/ttypes.py
index e15cf1d,46e7a92..112daaa
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@@ -44,26 -44,23 +44,43 @@@ class TopologyInitialStatus
"INACTIVE": 2,
}
+class TopologyStatus:
+ ACTIVE = 1
+ INACTIVE = 2
+ REBALANCING = 3
+ KILLED = 4
+
+ _VALUES_TO_NAMES = {
+ 1: "ACTIVE",
+ 2: "INACTIVE",
+ 3: "REBALANCING",
+ 4: "KILLED",
+ }
+
+ _NAMES_TO_VALUES = {
+ "ACTIVE": 1,
+ "INACTIVE": 2,
+ "REBALANCING": 3,
+ "KILLED": 4,
+ }
+
+ class NumErrorsChoice:
+ ALL = 0
+ NONE = 1
+ ONE = 2
+
+ _VALUES_TO_NAMES = {
+ 0: "ALL",
+ 1: "NONE",
+ 2: "ONE",
+ }
+
+ _NAMES_TO_VALUES = {
+ "ALL": 0,
+ "NONE": 1,
+ "ONE": 2,
+ }
+
class JavaObjectArg:
"""
@@@ -4417,764 -4400,69 +4434,827 @@@ class SubmitOptions
def __ne__(self, other):
return not (self == other)
+class SupervisorInfo:
+ """
+ Attributes:
+ - time_secs
+ - hostname
+ - assignment_id
+ - used_ports
+ - meta
+ - scheduler_meta
+ - uptime_secs
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'time_secs', None, None, ), # 1
+ (2, TType.STRING, 'hostname', None, None, ), # 2
+ (3, TType.STRING, 'assignment_id', None, None, ), # 3
+ (4, TType.LIST, 'used_ports', (TType.I64,None), None, ), # 4
+ (5, TType.LIST, 'meta', (TType.I64,None), None, ), # 5
+ (6, TType.MAP, 'scheduler_meta', (TType.STRING,None,TType.STRING,None), None, ), # 6
+ (7, TType.I64, 'uptime_secs', None, None, ), # 7
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.time_secs) + hash(self.hostname) + hash(self.assignment_id) + hash(self.used_ports) + hash(self.meta) + hash(self.scheduler_meta) + hash(self.uptime_secs)
+
+ def __init__(self, time_secs=None, hostname=None, assignment_id=None, used_ports=None, meta=None, scheduler_meta=None, uptime_secs=None,):
+ self.time_secs = time_secs
+ self.hostname = hostname
+ self.assignment_id = assignment_id
+ self.used_ports = used_ports
+ self.meta = meta
+ self.scheduler_meta = scheduler_meta
+ self.uptime_secs = uptime_secs
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.time_secs = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.hostname = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.STRING:
+ self.assignment_id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.LIST:
+ self.used_ports = []
+ (_etype304, _size301) = iprot.readListBegin()
+ for _i305 in xrange(_size301):
+ _elem306 = iprot.readI64();
+ self.used_ports.append(_elem306)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.LIST:
+ self.meta = []
+ (_etype310, _size307) = iprot.readListBegin()
+ for _i311 in xrange(_size307):
+ _elem312 = iprot.readI64();
+ self.meta.append(_elem312)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.MAP:
+ self.scheduler_meta = {}
+ (_ktype314, _vtype315, _size313 ) = iprot.readMapBegin()
+ for _i317 in xrange(_size313):
+ _key318 = iprot.readString().decode('utf-8')
+ _val319 = iprot.readString().decode('utf-8')
+ self.scheduler_meta[_key318] = _val319
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.I64:
+ self.uptime_secs = iprot.readI64();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('SupervisorInfo')
+ if self.time_secs is not None:
+ oprot.writeFieldBegin('time_secs', TType.I64, 1)
+ oprot.writeI64(self.time_secs)
+ oprot.writeFieldEnd()
+ if self.hostname is not None:
+ oprot.writeFieldBegin('hostname', TType.STRING, 2)
+ oprot.writeString(self.hostname.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.assignment_id is not None:
+ oprot.writeFieldBegin('assignment_id', TType.STRING, 3)
+ oprot.writeString(self.assignment_id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.used_ports is not None:
+ oprot.writeFieldBegin('used_ports', TType.LIST, 4)
+ oprot.writeListBegin(TType.I64, len(self.used_ports))
+ for iter320 in self.used_ports:
+ oprot.writeI64(iter320)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.meta is not None:
+ oprot.writeFieldBegin('meta', TType.LIST, 5)
+ oprot.writeListBegin(TType.I64, len(self.meta))
+ for iter321 in self.meta:
+ oprot.writeI64(iter321)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.scheduler_meta is not None:
+ oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6)
+ oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta))
+ for kiter322,viter323 in self.scheduler_meta.items():
+ oprot.writeString(kiter322.encode('utf-8'))
+ oprot.writeString(viter323.encode('utf-8'))
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ if self.uptime_secs is not None:
+ oprot.writeFieldBegin('uptime_secs', TType.I64, 7)
+ oprot.writeI64(self.uptime_secs)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.time_secs is None:
+ raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+ if self.hostname is None:
+ raise TProtocol.TProtocolException(message='Required field hostname is unset!')
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class NodeInfo:
+ """
+ Attributes:
+ - node
+ - port
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'node', None, None, ), # 1
+ (2, TType.SET, 'port', (TType.I64,None), None, ), # 2
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.node) + hash(self.port)
+
+ def __init__(self, node=None, port=None,):
+ self.node = node
+ self.port = port
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.node = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.SET:
+ self.port = set()
+ (_etype327, _size324) = iprot.readSetBegin()
+ for _i328 in xrange(_size324):
+ _elem329 = iprot.readI64();
+ self.port.add(_elem329)
+ iprot.readSetEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('NodeInfo')
+ if self.node is not None:
+ oprot.writeFieldBegin('node', TType.STRING, 1)
+ oprot.writeString(self.node.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.port is not None:
+ oprot.writeFieldBegin('port', TType.SET, 2)
+ oprot.writeSetBegin(TType.I64, len(self.port))
+ for iter330 in self.port:
+ oprot.writeI64(iter330)
+ oprot.writeSetEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.node is None:
+ raise TProtocol.TProtocolException(message='Required field node is unset!')
+ if self.port is None:
+ raise TProtocol.TProtocolException(message='Required field port is unset!')
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class Assignment:
+ """
+ Attributes:
+ - master_code_dir
+ - node_host
+ - executor_node_port
+ - executor_start_time_secs
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'master_code_dir', None, None, ), # 1
+ (2, TType.MAP, 'node_host', (TType.STRING,None,TType.STRING,None), {
+ }, ), # 2
+ (3, TType.MAP, 'executor_node_port', (TType.LIST,(TType.I64,None),TType.STRUCT,(NodeInfo, NodeInfo.thrift_spec)), {
+ }, ), # 3
+ (4, TType.MAP, 'executor_start_time_secs', (TType.LIST,(TType.I64,None),TType.I64,None), {
+ }, ), # 4
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.master_code_dir) + hash(self.node_host) + hash(self.executor_node_port) + hash(self.executor_start_time_secs)
+
+ def __init__(self, master_code_dir=None, node_host=thrift_spec[2][4], executor_node_port=thrift_spec[3][4], executor_start_time_secs=thrift_spec[4][4],):
+ self.master_code_dir = master_code_dir
+ if node_host is self.thrift_spec[2][4]:
+ node_host = {
+ }
+ self.node_host = node_host
+ if executor_node_port is self.thrift_spec[3][4]:
+ executor_node_port = {
+ }
+ self.executor_node_port = executor_node_port
+ if executor_start_time_secs is self.thrift_spec[4][4]:
+ executor_start_time_secs = {
+ }
+ self.executor_start_time_secs = executor_start_time_secs
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.master_code_dir = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.MAP:
+ self.node_host = {}
+ (_ktype332, _vtype333, _size331 ) = iprot.readMapBegin()
+ for _i335 in xrange(_size331):
+ _key336 = iprot.readString().decode('utf-8')
+ _val337 = iprot.readString().decode('utf-8')
+ self.node_host[_key336] = _val337
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.MAP:
+ self.executor_node_port = {}
+ (_ktype339, _vtype340, _size338 ) = iprot.readMapBegin()
+ for _i342 in xrange(_size338):
+ _key343 = []
+ (_etype348, _size345) = iprot.readListBegin()
+ for _i349 in xrange(_size345):
+ _elem350 = iprot.readI64();
+ _key343.append(_elem350)
+ iprot.readListEnd()
+ _val344 = NodeInfo()
+ _val344.read(iprot)
+ self.executor_node_port[_key343] = _val344
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.MAP:
+ self.executor_start_time_secs = {}
+ (_ktype352, _vtype353, _size351 ) = iprot.readMapBegin()
+ for _i355 in xrange(_size351):
+ _key356 = []
+ (_etype361, _size358) = iprot.readListBegin()
+ for _i362 in xrange(_size358):
+ _elem363 = iprot.readI64();
+ _key356.append(_elem363)
+ iprot.readListEnd()
+ _val357 = iprot.readI64();
+ self.executor_start_time_secs[_key356] = _val357
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('Assignment')
+ if self.master_code_dir is not None:
+ oprot.writeFieldBegin('master_code_dir', TType.STRING, 1)
+ oprot.writeString(self.master_code_dir.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.node_host is not None:
+ oprot.writeFieldBegin('node_host', TType.MAP, 2)
+ oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host))
+ for kiter364,viter365 in self.node_host.items():
+ oprot.writeString(kiter364.encode('utf-8'))
+ oprot.writeString(viter365.encode('utf-8'))
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ if self.executor_node_port is not None:
+ oprot.writeFieldBegin('executor_node_port', TType.MAP, 3)
+ oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port))
+ for kiter366,viter367 in self.executor_node_port.items():
+ oprot.writeListBegin(TType.I64, len(kiter366))
+ for iter368 in kiter366:
+ oprot.writeI64(iter368)
+ oprot.writeListEnd()
+ viter367.write(oprot)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ if self.executor_start_time_secs is not None:
+ oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4)
+ oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs))
+ for kiter369,viter370 in self.executor_start_time_secs.items():
+ oprot.writeListBegin(TType.I64, len(kiter369))
+ for iter371 in kiter369:
+ oprot.writeI64(iter371)
+ oprot.writeListEnd()
+ oprot.writeI64(viter370)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.master_code_dir is None:
+ raise TProtocol.TProtocolException(message='Required field master_code_dir is unset!')
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class TopologyActionOptions:
+ """
+ Attributes:
+ - kill_options
+ - rebalance_options
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRUCT, 'kill_options', (KillOptions, KillOptions.thrift_spec), None, ), # 1
+ (2, TType.STRUCT, 'rebalance_options', (RebalanceOptions, RebalanceOptions.thrift_spec), None, ), # 2
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.kill_options) + hash(self.rebalance_options)
+
+ def __init__(self, kill_options=None, rebalance_options=None,):
+ self.kill_options = kill_options
+ self.rebalance_options = rebalance_options
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRUCT:
+ self.kill_options = KillOptions()
+ self.kill_options.read(iprot)
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRUCT:
+ self.rebalance_options = RebalanceOptions()
+ self.rebalance_options.read(iprot)
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('TopologyActionOptions')
+ if self.kill_options is not None:
+ oprot.writeFieldBegin('kill_options', TType.STRUCT, 1)
+ self.kill_options.write(oprot)
+ oprot.writeFieldEnd()
+ if self.rebalance_options is not None:
+ oprot.writeFieldBegin('rebalance_options', TType.STRUCT, 2)
+ self.rebalance_options.write(oprot)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class StormBase:
+ """
+ Attributes:
+ - name
+ - status
+ - num_workers
+ - component_executors
+ - launch_time_secs
+ - owner
+ - topology_action_options
+ - prev_status
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'name', None, None, ), # 1
+ (2, TType.I32, 'status', None, None, ), # 2
+ (3, TType.I32, 'num_workers', None, None, ), # 3
+ (4, TType.MAP, 'component_executors', (TType.STRING,None,TType.I32,None), None, ), # 4
+ (5, TType.I32, 'launch_time_secs', None, None, ), # 5
+ (6, TType.STRING, 'owner', None, None, ), # 6
+ (7, TType.STRUCT, 'topology_action_options', (TopologyActionOptions, TopologyActionOptions.thrift_spec), None, ), # 7
+ (8, TType.I32, 'prev_status', None, None, ), # 8
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.name) + hash(self.status) + hash(self.num_workers) + hash(self.component_executors) + hash(self.launch_time_secs) + hash(self.owner) + hash(self.topology_action_options) + hash(self.prev_status)
+
+ def __init__(self, name=None, status=None, num_workers=None, component_executors=None, launch_time_secs=None, owner=None, topology_action_options=None, prev_status=None,):
+ self.name = name
+ self.status = status
+ self.num_workers = num_workers
+ self.component_executors = component_executors
+ self.launch_time_secs = launch_time_secs
+ self.owner = owner
+ self.topology_action_options = topology_action_options
+ self.prev_status = prev_status
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.name = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I32:
+ self.status = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.num_workers = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.MAP:
+ self.component_executors = {}
+ (_ktype373, _vtype374, _size372 ) = iprot.readMapBegin()
+ for _i376 in xrange(_size372):
+ _key377 = iprot.readString().decode('utf-8')
+ _val378 = iprot.readI32();
+ self.component_executors[_key377] = _val378
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.I32:
+ self.launch_time_secs = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 6:
+ if ftype == TType.STRING:
+ self.owner = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 7:
+ if ftype == TType.STRUCT:
+ self.topology_action_options = TopologyActionOptions()
+ self.topology_action_options.read(iprot)
+ else:
+ iprot.skip(ftype)
+ elif fid == 8:
+ if ftype == TType.I32:
+ self.prev_status = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('StormBase')
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 1)
+ oprot.writeString(self.name.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.status is not None:
+ oprot.writeFieldBegin('status', TType.I32, 2)
+ oprot.writeI32(self.status)
+ oprot.writeFieldEnd()
+ if self.num_workers is not None:
+ oprot.writeFieldBegin('num_workers', TType.I32, 3)
+ oprot.writeI32(self.num_workers)
+ oprot.writeFieldEnd()
+ if self.component_executors is not None:
+ oprot.writeFieldBegin('component_executors', TType.MAP, 4)
+ oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors))
+ for kiter379,viter380 in self.component_executors.items():
+ oprot.writeString(kiter379.encode('utf-8'))
+ oprot.writeI32(viter380)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ if self.launch_time_secs is not None:
+ oprot.writeFieldBegin('launch_time_secs', TType.I32, 5)
+ oprot.writeI32(self.launch_time_secs)
+ oprot.writeFieldEnd()
+ if self.owner is not None:
+ oprot.writeFieldBegin('owner', TType.STRING, 6)
+ oprot.writeString(self.owner.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.topology_action_options is not None:
+ oprot.writeFieldBegin('topology_action_options', TType.STRUCT, 7)
+ self.topology_action_options.write(oprot)
+ oprot.writeFieldEnd()
+ if self.prev_status is not None:
+ oprot.writeFieldBegin('prev_status', TType.I32, 8)
+ oprot.writeI32(self.prev_status)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.name is None:
+ raise TProtocol.TProtocolException(message='Required field name is unset!')
+ if self.status is None:
+ raise TProtocol.TProtocolException(message='Required field status is unset!')
+ if self.num_workers is None:
+ raise TProtocol.TProtocolException(message='Required field num_workers is unset!')
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class ZKWorkerHeartbeat:
+ """
+ Attributes:
+ - storm_id
+ - executor_stats
+ - time_secs
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'storm_id', None, None, ), # 1
+ (2, TType.MAP, 'executor_stats', (TType.STRUCT,(ExecutorInfo, ExecutorInfo.thrift_spec),TType.STRUCT,(ExecutorStats, ExecutorStats.thrift_spec)), None, ), # 2
+ (3, TType.I32, 'time_secs', None, None, ), # 3
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.storm_id) + hash(self.executor_stats) + hash(self.time_secs)
+
+ def __init__(self, storm_id=None, executor_stats=None, time_secs=None,):
+ self.storm_id = storm_id
+ self.executor_stats = executor_stats
+ self.time_secs = time_secs
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.storm_id = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.MAP:
+ self.executor_stats = {}
+ (_ktype382, _vtype383, _size381 ) = iprot.readMapBegin()
+ for _i385 in xrange(_size381):
+ _key386 = ExecutorInfo()
+ _key386.read(iprot)
+ _val387 = ExecutorStats()
+ _val387.read(iprot)
+ self.executor_stats[_key386] = _val387
+ iprot.readMapEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.time_secs = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('ZKWorkerHeartbeat')
+ if self.storm_id is not None:
+ oprot.writeFieldBegin('storm_id', TType.STRING, 1)
+ oprot.writeString(self.storm_id.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.executor_stats is not None:
+ oprot.writeFieldBegin('executor_stats', TType.MAP, 2)
+ oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats))
+ for kiter388,viter389 in self.executor_stats.items():
+ kiter388.write(oprot)
+ viter389.write(oprot)
+ oprot.writeMapEnd()
+ oprot.writeFieldEnd()
+ if self.time_secs is not None:
+ oprot.writeFieldBegin('time_secs', TType.I32, 3)
+ oprot.writeI32(self.time_secs)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.storm_id is None:
+ raise TProtocol.TProtocolException(message='Required field storm_id is unset!')
+ if self.executor_stats is None:
+ raise TProtocol.TProtocolException(message='Required field executor_stats is unset!')
+ if self.time_secs is None:
+ raise TProtocol.TProtocolException(message='Required field time_secs is unset!')
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+ class GetInfoOptions:
+ """
+ Attributes:
+ - num_err_choice
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'num_err_choice', None, None, ), # 1
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.num_err_choice)
+
+ def __init__(self, num_err_choice=None,):
+ self.num_err_choice = num_err_choice
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.num_err_choice = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('GetInfoOptions')
+ if self.num_err_choice is not None:
+ oprot.writeFieldBegin('num_err_choice', TType.I32, 1)
+ oprot.writeI32(self.num_err_choice)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class DRPCRequest:
"""
Attributes:
http://git-wip-us.apache.org/repos/asf/storm/blob/0571e22c/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --cc storm-core/src/storm.thrift
index 3cc0eb9,066cb4f..04b6a1b
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@@ -244,55 -243,15 +244,64 @@@ struct SubmitOptions
2: optional Credentials creds;
}
+struct SupervisorInfo {
+ 1: required i64 time_secs;
+ 2: required string hostname;
+ 3: optional string assignment_id;
+ 4: optional list<i64> used_ports;
+ 5: optional list<i64> meta;
+ 6: optional map<string, string> scheduler_meta;
+ 7: optional i64 uptime_secs;
+}
+struct NodeInfo {
+ 1: required string node;
+ 2: required set<i64> port;
+}
+
+struct Assignment {
+ 1: required string master_code_dir;
+ 2: optional map<string, string> node_host = {};
+ 3: optional map<list<i64>, NodeInfo> executor_node_port = {};
+ 4: optional map<list<i64>, i64> executor_start_time_secs = {};
+}
+
+enum TopologyStatus {
+ ACTIVE = 1,
+ INACTIVE = 2,
+ REBALANCING = 3,
+ KILLED = 4
+}
+
+union TopologyActionOptions {
+ 1: optional KillOptions kill_options;
+ 2: optional RebalanceOptions rebalance_options;
+}
+
+struct StormBase {
+ 1: required string name;
+ 2: required TopologyStatus status;
+ 3: required i32 num_workers;
+ 4: optional map<string, i32> component_executors;
+ 5: optional i32 launch_time_secs;
+ 6: optional string owner;
+ 7: optional TopologyActionOptions topology_action_options;
+ 8: optional TopologyStatus prev_status;//currently only used during rebalance action.
+}
+
+struct ZKWorkerHeartbeat {
+ 1: required string storm_id;
+ 2: required map<ExecutorInfo,ExecutorStats> executor_stats;
+ 3: required i32 time_secs;
+}
+ enum NumErrorsChoice {
+ ALL,
+ NONE,
+ ONE
+ }
+
+ struct GetInfoOptions {
+ 1: optional NumErrorsChoice num_err_choice;
+ }
service Nimbus {
void submitTopology(1: string name, 2: string uploadedJarLocation, 3: string jsonConf, 4: StormTopology topology) throws (1: AlreadyAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);