You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:03:57 UTC
[02/17] storm git commit: Blobstore API STORM- 876
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index a730c13..98a7ba4 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -62,6 +62,20 @@ class TopologyInitialStatus:
"INACTIVE": 2,
}
+class AccessControlType:
+ OTHER = 1
+ USER = 2
+
+ _VALUES_TO_NAMES = {
+ 1: "OTHER",
+ 2: "USER",
+ }
+
+ _NAMES_TO_VALUES = {
+ "OTHER": 1,
+ "USER": 2,
+ }
+
class TopologyStatus:
ACTIVE = 1
INACTIVE = 2
@@ -1802,6 +1816,146 @@ class InvalidTopologyException(TException):
def __ne__(self, other):
return not (self == other)
+class KeyNotFoundException(TException):
+ """
+ Attributes:
+ - msg
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'msg', None, None, ), # 1
+ )
+
+ def __init__(self, msg=None,):
+ self.msg = msg
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.msg = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('KeyNotFoundException')
+ if self.msg is not None:
+ oprot.writeFieldBegin('msg', TType.STRING, 1)
+ oprot.writeString(self.msg.encode('utf-8'))
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.msg is None:
+ raise TProtocol.TProtocolException(message='Required field msg is unset!')
+ return
+
+
+ def __str__(self):
+ return repr(self)
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.msg)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class KeyAlreadyExistsException(TException):
+ """
+ Attributes:
+ - msg
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'msg', None, None, ), # 1
+ )
+
+ def __init__(self, msg=None,):
+ self.msg = msg
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.msg = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('KeyAlreadyExistsException')
+ if self.msg is not None:
+ oprot.writeFieldBegin('msg', TType.STRING, 1)
+ oprot.writeString(self.msg.encode('utf-8'))
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.msg is None:
+ raise TProtocol.TProtocolException(message='Required field msg is unset!')
+ return
+
+
+ def __str__(self):
+ return repr(self)
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.msg)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class TopologySummary:
"""
Attributes:
@@ -7110,6 +7264,458 @@ class SubmitOptions:
def __ne__(self, other):
return not (self == other)
+class AccessControl:
+ """
+ Attributes:
+ - type
+ - name
+ - access
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I32, 'type', None, None, ), # 1
+ (2, TType.STRING, 'name', None, None, ), # 2
+ (3, TType.I32, 'access', None, None, ), # 3
+ )
+
+ def __init__(self, type=None, name=None, access=None,):
+ self.type = type
+ self.name = name
+ self.access = access
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I32:
+ self.type = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.name = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.access = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('AccessControl')
+ if self.type is not None:
+ oprot.writeFieldBegin('type', TType.I32, 1)
+ oprot.writeI32(self.type)
+ oprot.writeFieldEnd()
+ if self.name is not None:
+ oprot.writeFieldBegin('name', TType.STRING, 2)
+ oprot.writeString(self.name.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.access is not None:
+ oprot.writeFieldBegin('access', TType.I32, 3)
+ oprot.writeI32(self.access)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.type is None:
+ raise TProtocol.TProtocolException(message='Required field type is unset!')
+ if self.access is None:
+ raise TProtocol.TProtocolException(message='Required field access is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.type)
+ value = (value * 31) ^ hash(self.name)
+ value = (value * 31) ^ hash(self.access)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class SettableBlobMeta:
+ """
+ Attributes:
+ - acl
+ - replication_factor
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.LIST, 'acl', (TType.STRUCT,(AccessControl, AccessControl.thrift_spec)), None, ), # 1
+ (2, TType.I32, 'replication_factor', None, None, ), # 2
+ )
+
+ def __init__(self, acl=None, replication_factor=None,):
+ self.acl = acl
+ self.replication_factor = replication_factor
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.LIST:
+ self.acl = []
+ (_etype440, _size437) = iprot.readListBegin()
+ for _i441 in xrange(_size437):
+ _elem442 = AccessControl()
+ _elem442.read(iprot)
+ self.acl.append(_elem442)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I32:
+ self.replication_factor = iprot.readI32()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('SettableBlobMeta')
+ if self.acl is not None:
+ oprot.writeFieldBegin('acl', TType.LIST, 1)
+ oprot.writeListBegin(TType.STRUCT, len(self.acl))
+ for iter443 in self.acl:
+ iter443.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.replication_factor is not None:
+ oprot.writeFieldBegin('replication_factor', TType.I32, 2)
+ oprot.writeI32(self.replication_factor)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.acl is None:
+ raise TProtocol.TProtocolException(message='Required field acl is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.acl)
+ value = (value * 31) ^ hash(self.replication_factor)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class ReadableBlobMeta:
+ """
+ Attributes:
+ - settable
+ - version
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRUCT, 'settable', (SettableBlobMeta, SettableBlobMeta.thrift_spec), None, ), # 1
+ (2, TType.I64, 'version', None, None, ), # 2
+ )
+
+ def __init__(self, settable=None, version=None,):
+ self.settable = settable
+ self.version = version
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRUCT:
+ self.settable = SettableBlobMeta()
+ self.settable.read(iprot)
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I64:
+ self.version = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('ReadableBlobMeta')
+ if self.settable is not None:
+ oprot.writeFieldBegin('settable', TType.STRUCT, 1)
+ self.settable.write(oprot)
+ oprot.writeFieldEnd()
+ if self.version is not None:
+ oprot.writeFieldBegin('version', TType.I64, 2)
+ oprot.writeI64(self.version)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.settable is None:
+ raise TProtocol.TProtocolException(message='Required field settable is unset!')
+ if self.version is None:
+ raise TProtocol.TProtocolException(message='Required field version is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.settable)
+ value = (value * 31) ^ hash(self.version)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class ListBlobsResult:
+ """
+ Attributes:
+ - keys
+ - session
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.LIST, 'keys', (TType.STRING,None), None, ), # 1
+ (2, TType.STRING, 'session', None, None, ), # 2
+ )
+
+ def __init__(self, keys=None, session=None,):
+ self.keys = keys
+ self.session = session
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.LIST:
+ self.keys = []
+ (_etype447, _size444) = iprot.readListBegin()
+ for _i448 in xrange(_size444):
+ _elem449 = iprot.readString().decode('utf-8')
+ self.keys.append(_elem449)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.session = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('ListBlobsResult')
+ if self.keys is not None:
+ oprot.writeFieldBegin('keys', TType.LIST, 1)
+ oprot.writeListBegin(TType.STRING, len(self.keys))
+ for iter450 in self.keys:
+ oprot.writeString(iter450.encode('utf-8'))
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.session is not None:
+ oprot.writeFieldBegin('session', TType.STRING, 2)
+ oprot.writeString(self.session.encode('utf-8'))
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.keys is None:
+ raise TProtocol.TProtocolException(message='Required field keys is unset!')
+ if self.session is None:
+ raise TProtocol.TProtocolException(message='Required field session is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.keys)
+ value = (value * 31) ^ hash(self.session)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
+class BeginDownloadResult:
+ """
+ Attributes:
+ - version
+ - session
+ - data_size
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.I64, 'version', None, None, ), # 1
+ (2, TType.STRING, 'session', None, None, ), # 2
+ (3, TType.I64, 'data_size', None, None, ), # 3
+ )
+
+ def __init__(self, version=None, session=None, data_size=None,):
+ self.version = version
+ self.session = session
+ self.data_size = data_size
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.I64:
+ self.version = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.STRING:
+ self.session = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I64:
+ self.data_size = iprot.readI64()
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('BeginDownloadResult')
+ if self.version is not None:
+ oprot.writeFieldBegin('version', TType.I64, 1)
+ oprot.writeI64(self.version)
+ oprot.writeFieldEnd()
+ if self.session is not None:
+ oprot.writeFieldBegin('session', TType.STRING, 2)
+ oprot.writeString(self.session.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.data_size is not None:
+ oprot.writeFieldBegin('data_size', TType.I64, 3)
+ oprot.writeI64(self.data_size)
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.version is None:
+ raise TProtocol.TProtocolException(message='Required field version is unset!')
+ if self.session is None:
+ raise TProtocol.TProtocolException(message='Required field session is unset!')
+ return
+
+
+ def __hash__(self):
+ value = 17
+ value = (value * 31) ^ hash(self.version)
+ value = (value * 31) ^ hash(self.session)
+ value = (value * 31) ^ hash(self.data_size)
+ return value
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class SupervisorInfo:
"""
Attributes:
@@ -7175,31 +7781,31 @@ class SupervisorInfo:
elif fid == 4:
if ftype == TType.LIST:
self.used_ports = []
- (_etype440, _size437) = iprot.readListBegin()
- for _i441 in xrange(_size437):
- _elem442 = iprot.readI64()
- self.used_ports.append(_elem442)
+ (_etype454, _size451) = iprot.readListBegin()
+ for _i455 in xrange(_size451):
+ _elem456 = iprot.readI64()
+ self.used_ports.append(_elem456)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.LIST:
self.meta = []
- (_etype446, _size443) = iprot.readListBegin()
- for _i447 in xrange(_size443):
- _elem448 = iprot.readI64()
- self.meta.append(_elem448)
+ (_etype460, _size457) = iprot.readListBegin()
+ for _i461 in xrange(_size457):
+ _elem462 = iprot.readI64()
+ self.meta.append(_elem462)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 6:
if ftype == TType.MAP:
self.scheduler_meta = {}
- (_ktype450, _vtype451, _size449 ) = iprot.readMapBegin()
- for _i453 in xrange(_size449):
- _key454 = iprot.readString().decode('utf-8')
- _val455 = iprot.readString().decode('utf-8')
- self.scheduler_meta[_key454] = _val455
+ (_ktype464, _vtype465, _size463 ) = iprot.readMapBegin()
+ for _i467 in xrange(_size463):
+ _key468 = iprot.readString().decode('utf-8')
+ _val469 = iprot.readString().decode('utf-8')
+ self.scheduler_meta[_key468] = _val469
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -7216,11 +7822,11 @@ class SupervisorInfo:
elif fid == 9:
if ftype == TType.MAP:
self.resources_map = {}
- (_ktype457, _vtype458, _size456 ) = iprot.readMapBegin()
- for _i460 in xrange(_size456):
- _key461 = iprot.readString().decode('utf-8')
- _val462 = iprot.readDouble()
- self.resources_map[_key461] = _val462
+ (_ktype471, _vtype472, _size470 ) = iprot.readMapBegin()
+ for _i474 in xrange(_size470):
+ _key475 = iprot.readString().decode('utf-8')
+ _val476 = iprot.readDouble()
+ self.resources_map[_key475] = _val476
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -7249,23 +7855,23 @@ class SupervisorInfo:
if self.used_ports is not None:
oprot.writeFieldBegin('used_ports', TType.LIST, 4)
oprot.writeListBegin(TType.I64, len(self.used_ports))
- for iter463 in self.used_ports:
- oprot.writeI64(iter463)
+ for iter477 in self.used_ports:
+ oprot.writeI64(iter477)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.meta is not None:
oprot.writeFieldBegin('meta', TType.LIST, 5)
oprot.writeListBegin(TType.I64, len(self.meta))
- for iter464 in self.meta:
- oprot.writeI64(iter464)
+ for iter478 in self.meta:
+ oprot.writeI64(iter478)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.scheduler_meta is not None:
oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta))
- for kiter465,viter466 in self.scheduler_meta.items():
- oprot.writeString(kiter465.encode('utf-8'))
- oprot.writeString(viter466.encode('utf-8'))
+ for kiter479,viter480 in self.scheduler_meta.items():
+ oprot.writeString(kiter479.encode('utf-8'))
+ oprot.writeString(viter480.encode('utf-8'))
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.uptime_secs is not None:
@@ -7279,9 +7885,9 @@ class SupervisorInfo:
if self.resources_map is not None:
oprot.writeFieldBegin('resources_map', TType.MAP, 9)
oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.resources_map))
- for kiter467,viter468 in self.resources_map.items():
- oprot.writeString(kiter467.encode('utf-8'))
- oprot.writeDouble(viter468)
+ for kiter481,viter482 in self.resources_map.items():
+ oprot.writeString(kiter481.encode('utf-8'))
+ oprot.writeDouble(viter482)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -7353,10 +7959,10 @@ class NodeInfo:
elif fid == 2:
if ftype == TType.SET:
self.port = set()
- (_etype472, _size469) = iprot.readSetBegin()
- for _i473 in xrange(_size469):
- _elem474 = iprot.readI64()
- self.port.add(_elem474)
+ (_etype486, _size483) = iprot.readSetBegin()
+ for _i487 in xrange(_size483):
+ _elem488 = iprot.readI64()
+ self.port.add(_elem488)
iprot.readSetEnd()
else:
iprot.skip(ftype)
@@ -7377,8 +7983,8 @@ class NodeInfo:
if self.port is not None:
oprot.writeFieldBegin('port', TType.SET, 2)
oprot.writeSetBegin(TType.I64, len(self.port))
- for iter475 in self.port:
- oprot.writeI64(iter475)
+ for iter489 in self.port:
+ oprot.writeI64(iter489)
oprot.writeSetEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -7559,57 +8165,57 @@ class Assignment:
elif fid == 2:
if ftype == TType.MAP:
self.node_host = {}
- (_ktype477, _vtype478, _size476 ) = iprot.readMapBegin()
- for _i480 in xrange(_size476):
- _key481 = iprot.readString().decode('utf-8')
- _val482 = iprot.readString().decode('utf-8')
- self.node_host[_key481] = _val482
+ (_ktype491, _vtype492, _size490 ) = iprot.readMapBegin()
+ for _i494 in xrange(_size490):
+ _key495 = iprot.readString().decode('utf-8')
+ _val496 = iprot.readString().decode('utf-8')
+ self.node_host[_key495] = _val496
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.MAP:
self.executor_node_port = {}
- (_ktype484, _vtype485, _size483 ) = iprot.readMapBegin()
- for _i487 in xrange(_size483):
- _key488 = []
- (_etype493, _size490) = iprot.readListBegin()
- for _i494 in xrange(_size490):
- _elem495 = iprot.readI64()
- _key488.append(_elem495)
+ (_ktype498, _vtype499, _size497 ) = iprot.readMapBegin()
+ for _i501 in xrange(_size497):
+ _key502 = []
+ (_etype507, _size504) = iprot.readListBegin()
+ for _i508 in xrange(_size504):
+ _elem509 = iprot.readI64()
+ _key502.append(_elem509)
iprot.readListEnd()
- _val489 = NodeInfo()
- _val489.read(iprot)
- self.executor_node_port[_key488] = _val489
+ _val503 = NodeInfo()
+ _val503.read(iprot)
+ self.executor_node_port[_key502] = _val503
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.MAP:
self.executor_start_time_secs = {}
- (_ktype497, _vtype498, _size496 ) = iprot.readMapBegin()
- for _i500 in xrange(_size496):
- _key501 = []
- (_etype506, _size503) = iprot.readListBegin()
- for _i507 in xrange(_size503):
- _elem508 = iprot.readI64()
- _key501.append(_elem508)
+ (_ktype511, _vtype512, _size510 ) = iprot.readMapBegin()
+ for _i514 in xrange(_size510):
+ _key515 = []
+ (_etype520, _size517) = iprot.readListBegin()
+ for _i521 in xrange(_size517):
+ _elem522 = iprot.readI64()
+ _key515.append(_elem522)
iprot.readListEnd()
- _val502 = iprot.readI64()
- self.executor_start_time_secs[_key501] = _val502
+ _val516 = iprot.readI64()
+ self.executor_start_time_secs[_key515] = _val516
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.MAP:
self.worker_resources = {}
- (_ktype510, _vtype511, _size509 ) = iprot.readMapBegin()
- for _i513 in xrange(_size509):
- _key514 = NodeInfo()
- _key514.read(iprot)
- _val515 = WorkerResources()
- _val515.read(iprot)
- self.worker_resources[_key514] = _val515
+ (_ktype524, _vtype525, _size523 ) = iprot.readMapBegin()
+ for _i527 in xrange(_size523):
+ _key528 = NodeInfo()
+ _key528.read(iprot)
+ _val529 = WorkerResources()
+ _val529.read(iprot)
+ self.worker_resources[_key528] = _val529
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -7630,39 +8236,39 @@ class Assignment:
if self.node_host is not None:
oprot.writeFieldBegin('node_host', TType.MAP, 2)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host))
- for kiter516,viter517 in self.node_host.items():
- oprot.writeString(kiter516.encode('utf-8'))
- oprot.writeString(viter517.encode('utf-8'))
+ for kiter530,viter531 in self.node_host.items():
+ oprot.writeString(kiter530.encode('utf-8'))
+ oprot.writeString(viter531.encode('utf-8'))
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.executor_node_port is not None:
oprot.writeFieldBegin('executor_node_port', TType.MAP, 3)
oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port))
- for kiter518,viter519 in self.executor_node_port.items():
- oprot.writeListBegin(TType.I64, len(kiter518))
- for iter520 in kiter518:
- oprot.writeI64(iter520)
+ for kiter532,viter533 in self.executor_node_port.items():
+ oprot.writeListBegin(TType.I64, len(kiter532))
+ for iter534 in kiter532:
+ oprot.writeI64(iter534)
oprot.writeListEnd()
- viter519.write(oprot)
+ viter533.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.executor_start_time_secs is not None:
oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4)
oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs))
- for kiter521,viter522 in self.executor_start_time_secs.items():
- oprot.writeListBegin(TType.I64, len(kiter521))
- for iter523 in kiter521:
- oprot.writeI64(iter523)
+ for kiter535,viter536 in self.executor_start_time_secs.items():
+ oprot.writeListBegin(TType.I64, len(kiter535))
+ for iter537 in kiter535:
+ oprot.writeI64(iter537)
oprot.writeListEnd()
- oprot.writeI64(viter522)
+ oprot.writeI64(viter536)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.worker_resources is not None:
oprot.writeFieldBegin('worker_resources', TType.MAP, 5)
oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.worker_resources))
- for kiter524,viter525 in self.worker_resources.items():
- kiter524.write(oprot)
- viter525.write(oprot)
+ for kiter538,viter539 in self.worker_resources.items():
+ kiter538.write(oprot)
+ viter539.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -7839,11 +8445,11 @@ class StormBase:
elif fid == 4:
if ftype == TType.MAP:
self.component_executors = {}
- (_ktype527, _vtype528, _size526 ) = iprot.readMapBegin()
- for _i530 in xrange(_size526):
- _key531 = iprot.readString().decode('utf-8')
- _val532 = iprot.readI32()
- self.component_executors[_key531] = _val532
+ (_ktype541, _vtype542, _size540 ) = iprot.readMapBegin()
+ for _i544 in xrange(_size540):
+ _key545 = iprot.readString().decode('utf-8')
+ _val546 = iprot.readI32()
+ self.component_executors[_key545] = _val546
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -7871,12 +8477,12 @@ class StormBase:
elif fid == 9:
if ftype == TType.MAP:
self.component_debug = {}
- (_ktype534, _vtype535, _size533 ) = iprot.readMapBegin()
- for _i537 in xrange(_size533):
- _key538 = iprot.readString().decode('utf-8')
- _val539 = DebugOptions()
- _val539.read(iprot)
- self.component_debug[_key538] = _val539
+ (_ktype548, _vtype549, _size547 ) = iprot.readMapBegin()
+ for _i551 in xrange(_size547):
+ _key552 = iprot.readString().decode('utf-8')
+ _val553 = DebugOptions()
+ _val553.read(iprot)
+ self.component_debug[_key552] = _val553
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -7905,9 +8511,9 @@ class StormBase:
if self.component_executors is not None:
oprot.writeFieldBegin('component_executors', TType.MAP, 4)
oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors))
- for kiter540,viter541 in self.component_executors.items():
- oprot.writeString(kiter540.encode('utf-8'))
- oprot.writeI32(viter541)
+ for kiter554,viter555 in self.component_executors.items():
+ oprot.writeString(kiter554.encode('utf-8'))
+ oprot.writeI32(viter555)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.launch_time_secs is not None:
@@ -7929,9 +8535,9 @@ class StormBase:
if self.component_debug is not None:
oprot.writeFieldBegin('component_debug', TType.MAP, 9)
oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.component_debug))
- for kiter542,viter543 in self.component_debug.items():
- oprot.writeString(kiter542.encode('utf-8'))
- viter543.write(oprot)
+ for kiter556,viter557 in self.component_debug.items():
+ oprot.writeString(kiter556.encode('utf-8'))
+ viter557.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -8011,13 +8617,13 @@ class ClusterWorkerHeartbeat:
elif fid == 2:
if ftype == TType.MAP:
self.executor_stats = {}
- (_ktype545, _vtype546, _size544 ) = iprot.readMapBegin()
- for _i548 in xrange(_size544):
- _key549 = ExecutorInfo()
- _key549.read(iprot)
- _val550 = ExecutorStats()
- _val550.read(iprot)
- self.executor_stats[_key549] = _val550
+ (_ktype559, _vtype560, _size558 ) = iprot.readMapBegin()
+ for _i562 in xrange(_size558):
+ _key563 = ExecutorInfo()
+ _key563.read(iprot)
+ _val564 = ExecutorStats()
+ _val564.read(iprot)
+ self.executor_stats[_key563] = _val564
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -8048,9 +8654,9 @@ class ClusterWorkerHeartbeat:
if self.executor_stats is not None:
oprot.writeFieldBegin('executor_stats', TType.MAP, 2)
oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats))
- for kiter551,viter552 in self.executor_stats.items():
- kiter551.write(oprot)
- viter552.write(oprot)
+ for kiter565,viter566 in self.executor_stats.items():
+ kiter565.write(oprot)
+ viter566.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.time_secs is not None:
@@ -8203,12 +8809,12 @@ class LocalStateData:
if fid == 1:
if ftype == TType.MAP:
self.serialized_parts = {}
- (_ktype554, _vtype555, _size553 ) = iprot.readMapBegin()
- for _i557 in xrange(_size553):
- _key558 = iprot.readString().decode('utf-8')
- _val559 = ThriftSerializedObject()
- _val559.read(iprot)
- self.serialized_parts[_key558] = _val559
+ (_ktype568, _vtype569, _size567 ) = iprot.readMapBegin()
+ for _i571 in xrange(_size567):
+ _key572 = iprot.readString().decode('utf-8')
+ _val573 = ThriftSerializedObject()
+ _val573.read(iprot)
+ self.serialized_parts[_key572] = _val573
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -8225,9 +8831,9 @@ class LocalStateData:
if self.serialized_parts is not None:
oprot.writeFieldBegin('serialized_parts', TType.MAP, 1)
oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.serialized_parts))
- for kiter560,viter561 in self.serialized_parts.items():
- oprot.writeString(kiter560.encode('utf-8'))
- viter561.write(oprot)
+ for kiter574,viter575 in self.serialized_parts.items():
+ oprot.writeString(kiter574.encode('utf-8'))
+ viter575.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -8292,11 +8898,11 @@ class LocalAssignment:
elif fid == 2:
if ftype == TType.LIST:
self.executors = []
- (_etype565, _size562) = iprot.readListBegin()
- for _i566 in xrange(_size562):
- _elem567 = ExecutorInfo()
- _elem567.read(iprot)
- self.executors.append(_elem567)
+ (_etype579, _size576) = iprot.readListBegin()
+ for _i580 in xrange(_size576):
+ _elem581 = ExecutorInfo()
+ _elem581.read(iprot)
+ self.executors.append(_elem581)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -8323,8 +8929,8 @@ class LocalAssignment:
if self.executors is not None:
oprot.writeFieldBegin('executors', TType.LIST, 2)
oprot.writeListBegin(TType.STRUCT, len(self.executors))
- for iter568 in self.executors:
- iter568.write(oprot)
+ for iter582 in self.executors:
+ iter582.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.resources is not None:
@@ -8453,11 +9059,11 @@ class LSApprovedWorkers:
if fid == 1:
if ftype == TType.MAP:
self.approved_workers = {}
- (_ktype570, _vtype571, _size569 ) = iprot.readMapBegin()
- for _i573 in xrange(_size569):
- _key574 = iprot.readString().decode('utf-8')
- _val575 = iprot.readI32()
- self.approved_workers[_key574] = _val575
+ (_ktype584, _vtype585, _size583 ) = iprot.readMapBegin()
+ for _i587 in xrange(_size583):
+ _key588 = iprot.readString().decode('utf-8')
+ _val589 = iprot.readI32()
+ self.approved_workers[_key588] = _val589
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -8474,9 +9080,9 @@ class LSApprovedWorkers:
if self.approved_workers is not None:
oprot.writeFieldBegin('approved_workers', TType.MAP, 1)
oprot.writeMapBegin(TType.STRING, TType.I32, len(self.approved_workers))
- for kiter576,viter577 in self.approved_workers.items():
- oprot.writeString(kiter576.encode('utf-8'))
- oprot.writeI32(viter577)
+ for kiter590,viter591 in self.approved_workers.items():
+ oprot.writeString(kiter590.encode('utf-8'))
+ oprot.writeI32(viter591)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -8530,12 +9136,12 @@ class LSSupervisorAssignments:
if fid == 1:
if ftype == TType.MAP:
self.assignments = {}
- (_ktype579, _vtype580, _size578 ) = iprot.readMapBegin()
- for _i582 in xrange(_size578):
- _key583 = iprot.readI32()
- _val584 = LocalAssignment()
- _val584.read(iprot)
- self.assignments[_key583] = _val584
+ (_ktype593, _vtype594, _size592 ) = iprot.readMapBegin()
+ for _i596 in xrange(_size592):
+ _key597 = iprot.readI32()
+ _val598 = LocalAssignment()
+ _val598.read(iprot)
+ self.assignments[_key597] = _val598
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -8552,9 +9158,9 @@ class LSSupervisorAssignments:
if self.assignments is not None:
oprot.writeFieldBegin('assignments', TType.MAP, 1)
oprot.writeMapBegin(TType.I32, TType.STRUCT, len(self.assignments))
- for kiter585,viter586 in self.assignments.items():
- oprot.writeI32(kiter585)
- viter586.write(oprot)
+ for kiter599,viter600 in self.assignments.items():
+ oprot.writeI32(kiter599)
+ viter600.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -8627,11 +9233,11 @@ class LSWorkerHeartbeat:
elif fid == 3:
if ftype == TType.LIST:
self.executors = []
- (_etype590, _size587) = iprot.readListBegin()
- for _i591 in xrange(_size587):
- _elem592 = ExecutorInfo()
- _elem592.read(iprot)
- self.executors.append(_elem592)
+ (_etype604, _size601) = iprot.readListBegin()
+ for _i605 in xrange(_size601):
+ _elem606 = ExecutorInfo()
+ _elem606.read(iprot)
+ self.executors.append(_elem606)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -8661,8 +9267,8 @@ class LSWorkerHeartbeat:
if self.executors is not None:
oprot.writeFieldBegin('executors', TType.LIST, 3)
oprot.writeListBegin(TType.STRUCT, len(self.executors))
- for iter593 in self.executors:
- iter593.write(oprot)
+ for iter607 in self.executors:
+ iter607.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.port is not None:
@@ -8748,20 +9354,20 @@ class LSTopoHistory:
elif fid == 3:
if ftype == TType.LIST:
self.users = []
- (_etype597, _size594) = iprot.readListBegin()
- for _i598 in xrange(_size594):
- _elem599 = iprot.readString().decode('utf-8')
- self.users.append(_elem599)
+ (_etype611, _size608) = iprot.readListBegin()
+ for _i612 in xrange(_size608):
+ _elem613 = iprot.readString().decode('utf-8')
+ self.users.append(_elem613)
iprot.readListEnd()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.LIST:
self.groups = []
- (_etype603, _size600) = iprot.readListBegin()
- for _i604 in xrange(_size600):
- _elem605 = iprot.readString().decode('utf-8')
- self.groups.append(_elem605)
+ (_etype617, _size614) = iprot.readListBegin()
+ for _i618 in xrange(_size614):
+ _elem619 = iprot.readString().decode('utf-8')
+ self.groups.append(_elem619)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -8786,15 +9392,15 @@ class LSTopoHistory:
if self.users is not None:
oprot.writeFieldBegin('users', TType.LIST, 3)
oprot.writeListBegin(TType.STRING, len(self.users))
- for iter606 in self.users:
- oprot.writeString(iter606.encode('utf-8'))
+ for iter620 in self.users:
+ oprot.writeString(iter620.encode('utf-8'))
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.groups is not None:
oprot.writeFieldBegin('groups', TType.LIST, 4)
oprot.writeListBegin(TType.STRING, len(self.groups))
- for iter607 in self.groups:
- oprot.writeString(iter607.encode('utf-8'))
+ for iter621 in self.groups:
+ oprot.writeString(iter621.encode('utf-8'))
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -8857,11 +9463,11 @@ class LSTopoHistoryList:
if fid == 1:
if ftype == TType.LIST:
self.topo_history = []
- (_etype611, _size608) = iprot.readListBegin()
- for _i612 in xrange(_size608):
- _elem613 = LSTopoHistory()
- _elem613.read(iprot)
- self.topo_history.append(_elem613)
+ (_etype625, _size622) = iprot.readListBegin()
+ for _i626 in xrange(_size622):
+ _elem627 = LSTopoHistory()
+ _elem627.read(iprot)
+ self.topo_history.append(_elem627)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -8878,8 +9484,8 @@ class LSTopoHistoryList:
if self.topo_history is not None:
oprot.writeFieldBegin('topo_history', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.topo_history))
- for iter614 in self.topo_history:
- iter614.write(oprot)
+ for iter628 in self.topo_history:
+ iter628.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -9214,12 +9820,12 @@ class LogConfig:
if fid == 2:
if ftype == TType.MAP:
self.named_logger_level = {}
- (_ktype616, _vtype617, _size615 ) = iprot.readMapBegin()
- for _i619 in xrange(_size615):
- _key620 = iprot.readString().decode('utf-8')
- _val621 = LogLevel()
- _val621.read(iprot)
- self.named_logger_level[_key620] = _val621
+ (_ktype630, _vtype631, _size629 ) = iprot.readMapBegin()
+ for _i633 in xrange(_size629):
+ _key634 = iprot.readString().decode('utf-8')
+ _val635 = LogLevel()
+ _val635.read(iprot)
+ self.named_logger_level[_key634] = _val635
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -9236,9 +9842,9 @@ class LogConfig:
if self.named_logger_level is not None:
oprot.writeFieldBegin('named_logger_level', TType.MAP, 2)
oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.named_logger_level))
- for kiter622,viter623 in self.named_logger_level.items():
- oprot.writeString(kiter622.encode('utf-8'))
- viter623.write(oprot)
+ for kiter636,viter637 in self.named_logger_level.items():
+ oprot.writeString(kiter636.encode('utf-8'))
+ viter637.write(oprot)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -9290,10 +9896,10 @@ class TopologyHistoryInfo:
if fid == 1:
if ftype == TType.LIST:
self.topo_ids = []
- (_etype627, _size624) = iprot.readListBegin()
- for _i628 in xrange(_size624):
- _elem629 = iprot.readString().decode('utf-8')
- self.topo_ids.append(_elem629)
+ (_etype641, _size638) = iprot.readListBegin()
+ for _i642 in xrange(_size638):
+ _elem643 = iprot.readString().decode('utf-8')
+ self.topo_ids.append(_elem643)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -9310,8 +9916,8 @@ class TopologyHistoryInfo:
if self.topo_ids is not None:
oprot.writeFieldBegin('topo_ids', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.topo_ids))
- for iter630 in self.topo_ids:
- oprot.writeString(iter630.encode('utf-8'))
+ for iter644 in self.topo_ids:
+ oprot.writeString(iter644.encode('utf-8'))
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -9961,11 +10567,11 @@ class HBRecords:
if fid == 1:
if ftype == TType.LIST:
self.pulses = []
- (_etype627, _size624) = iprot.readListBegin()
- for _i628 in xrange(_size624):
- _elem629 = HBPulse()
- _elem629.read(iprot)
- self.pulses.append(_elem629)
+ (_etype648, _size645) = iprot.readListBegin()
+ for _i649 in xrange(_size645):
+ _elem650 = HBPulse()
+ _elem650.read(iprot)
+ self.pulses.append(_elem650)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -9982,8 +10588,8 @@ class HBRecords:
if self.pulses is not None:
oprot.writeFieldBegin('pulses', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.pulses))
- for iter630 in self.pulses:
- iter630.write(oprot)
+ for iter651 in self.pulses:
+ iter651.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -10035,10 +10641,10 @@ class HBNodes:
if fid == 1:
if ftype == TType.LIST:
self.pulseIds = []
- (_etype634, _size631) = iprot.readListBegin()
- for _i635 in xrange(_size631):
- _elem636 = iprot.readString().decode('utf-8')
- self.pulseIds.append(_elem636)
+ (_etype655, _size652) = iprot.readListBegin()
+ for _i656 in xrange(_size652):
+ _elem657 = iprot.readString().decode('utf-8')
+ self.pulseIds.append(_elem657)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -10055,8 +10661,8 @@ class HBNodes:
if self.pulseIds is not None:
oprot.writeFieldBegin('pulseIds', TType.LIST, 1)
oprot.writeListBegin(TType.STRING, len(self.pulseIds))
- for iter637 in self.pulseIds:
- oprot.writeString(iter637.encode('utf-8'))
+ for iter658 in self.pulseIds:
+ oprot.writeString(iter658.encode('utf-8'))
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 677de2b..08be005 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -136,6 +136,14 @@ exception InvalidTopologyException {
1: required string msg;
}
+exception KeyNotFoundException {
+ 1: required string msg;
+}
+
+exception KeyAlreadyExistsException {
+ 1: required string msg;
+}
+
struct TopologySummary {
1: required string id;
2: required string name;
@@ -371,6 +379,42 @@ struct SubmitOptions {
2: optional Credentials creds;
}
+enum AccessControlType {
+ OTHER = 1,
+ USER = 2
+ //eventually ,GROUP=3
+}
+
+struct AccessControl {
+ 1: required AccessControlType type;
+ 2: optional string name; //Name of user or group in ACL
+ 3: required i32 access; //bitmasks READ=0x1, WRITE=0x2, ADMIN=0x4
+}
+
+struct SettableBlobMeta {
+ 1: required list<AccessControl> acl;
+ 2: optional i32 replication_factor
+}
+
+struct ReadableBlobMeta {
+ 1: required SettableBlobMeta settable;
+ //This is some indication of a version of a BLOB. The only guarantee is
+ // if the data changed in the blob the version will be different.
+ 2: required i64 version;
+}
+
+struct ListBlobsResult {
+ 1: required list<string> keys;
+ 2: required string session;
+}
+
+struct BeginDownloadResult {
+ //Same version as in ReadableBlobMeta
+ 1: required i64 version;
+ 2: required string session;
+ 3: optional i64 data_size;
+}
+
struct SupervisorInfo {
1: required i64 time_secs;
2: required string hostname;
@@ -565,6 +609,21 @@ service Nimbus {
void uploadNewCredentials(1: string name, 2: Credentials creds) throws (1: NotAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);
+ string beginCreateBlob(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyAlreadyExistsException kae);
+ string beginUpdateBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ void uploadBlobChunk(1: string session, 2: binary chunk) throws (1: AuthorizationException aze);
+ void finishBlobUpload(1: string session) throws (1: AuthorizationException aze);
+ void cancelBlobUpload(1: string session) throws (1: AuthorizationException aze);
+ ReadableBlobMeta getBlobMeta(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ void setBlobMeta(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ BeginDownloadResult beginBlobDownload(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ binary downloadBlobChunk(1: string session) throws (1: AuthorizationException aze);
+ void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ ListBlobsResult listBlobs(1: string session); //empty string "" means start at the beginning
+ i32 getBlobReplication(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ i32 updateBlobReplication(1: string key, 2: i32 replication) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+ void createStateInZookeeper(1: string key); // creates state in zookeeper when blob is uploaded through command line
+
// need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
string beginFileUpload() throws (1: AuthorizationException aze);
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/cluster_test.clj b/storm-core/test/clj/backtype/storm/cluster_test.clj
index eea4637..82f305b 100644
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@ -208,14 +208,15 @@
(.set-credentials! state "storm1" {"b" "b"} {})
(is (= {"b" "b"} (.credentials state "storm1" nil)))
- (is (= [] (.code-distributor state nil)))
- (.setup-code-distributor! state "storm1" nimbusInfo1)
- (is (= ["storm1"] (.code-distributor state nil)))
- (is (= [nimbusInfo1] (.code-distributor-info state "storm1")))
- (.setup-code-distributor! state "storm1" nimbusInfo2)
- (is (= #{nimbusInfo1 nimbusInfo2} (set (.code-distributor-info state "storm1"))))
- (.remove-storm! state "storm1")
- (is (= [] (.code-distributor state nil)))
+ (is (= [] (.blobstore-info state nil)))
+ (.setup-blobstore! state "key1" nimbusInfo1 "1")
+ (is (= ["key1"] (.blobstore-info state nil)))
+ (is (= [(str (.toHostPortString nimbusInfo1) "-1")] (.blobstore-info state "key1")))
+ (.setup-blobstore! state "key1" nimbusInfo2 "1")
+ (is (= #{(str (.toHostPortString nimbusInfo1) "-1")
+ (str (.toHostPortString nimbusInfo2) "-1")} (set (.blobstore-info state "key1"))))
+ (.remove-blobstore-key! state "key1")
+ (is (= [] (.blobstore-info state nil)))
(is (= [] (.nimbuses state)))
(.add-nimbus-host! state "nimbus1:port" nimbusSummary1)
@@ -266,8 +267,7 @@
(let [state1 (mk-storm-state zk-port)
state2 (mk-storm-state zk-port)
supervisor-info1 (SupervisorInfo. 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil)
- supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)
- ]
+ supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)]
(is (= [] (.supervisors state1 nil)))
(.supervisor-heartbeat! state2 "2" supervisor-info2)
(.supervisor-heartbeat! state1 "1" supervisor-info1)
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 53d4bb8..0847883 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -29,7 +29,7 @@
LogConfig LogLevel LogLevelAction])
(:import [java.util HashMap])
(:import [java.io File])
- (:import [backtype.storm.utils Time])
+ (:import [backtype.storm.utils Time Utils])
(:import [org.apache.commons.io FileUtils])
(:use [backtype.storm testing MockAutoCred util config log timer zookeeper])
(:use [backtype.storm.daemon common])
@@ -939,41 +939,15 @@
(bind storm-id1 (get-storm-id cluster-state "t1"))
(bind storm-id2 (get-storm-id cluster-state "t2"))
(.shutdown nimbus)
- (rmr (master-stormdist-root conf storm-id1))
+ (let [blob-store (Utils/getNimbusBlobStore conf nil)]
+ (nimbus/blob-rm-topology-keys storm-id1 blob-store cluster-state)
+ (.shutdown blob-store))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(is ( = #{storm-id2} (set (.active-storms cluster-state))))
(.shutdown nimbus)
(.disconnect cluster-state)
)))))
-
-(deftest test-cleans-corrupt
- (with-inprocess-zookeeper zk-port
- (with-local-tmp [nimbus-dir]
- (stubbing [zk-leader-elector (mock-leader-elector)]
- (letlocals
- (bind conf (merge (read-storm-config)
- {STORM-ZOOKEEPER-SERVERS ["localhost"]
- STORM-CLUSTER-MODE "local"
- STORM-ZOOKEEPER-PORT zk-port
- STORM-LOCAL-DIR nimbus-dir}))
- (bind cluster-state (cluster/mk-storm-cluster-state conf))
- (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
- (bind topology (thrift/mk-topology
- {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
- {}))
- (submit-local-topology nimbus "t1" {} topology)
- (submit-local-topology nimbus "t2" {} topology)
- (bind storm-id1 (get-storm-id cluster-state "t1"))
- (bind storm-id2 (get-storm-id cluster-state "t2"))
- (.shutdown nimbus)
- (rmr (master-stormdist-root conf storm-id1))
- (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
- (is ( = #{storm-id2} (set (.active-storms cluster-state))))
- (.shutdown nimbus)
- (.disconnect cluster-state)
- )))))
-
;(deftest test-no-overlapping-slots
; ;; test that same node+port never appears across 2 assignments
; )
@@ -1173,7 +1147,7 @@
nimbus/check-authorization!
[1 2 3] expected-name expected-conf expected-operation)
(verify-first-call-args-for-indices
- nimbus/try-read-storm-topology [0] expected-conf))))))))))
+ nimbus/try-read-storm-topology [0] "fake-id"))))))))))
(deftest test-nimbus-iface-getTopology-methods-throw-correctly
(with-local-cluster [cluster]
@@ -1230,7 +1204,8 @@
:status {:type bogus-type}}
}
]
- (stubbing [topology-bases bogus-bases]
+ (stubbing [topology-bases bogus-bases
+ nimbus/get-blob-replication-count 1]
(let [topos (.get_topologies (.getClusterInfo nimbus))]
; The number of topologies in the summary is correct.
(is (= (count
@@ -1265,6 +1240,7 @@
digest "storm:thisisapoorpassword"
auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
STORM-ZOOKEEPER-AUTH-PAYLOAD digest
+ STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.DefaultPrincipalToLocal"
NIMBUS-THRIFT-PORT 6666}
expected-acls nimbus/NIMBUS-ZK-ACLS
fake-inimbus (reify INimbus (getForcedScheduler [this] nil))]
@@ -1272,10 +1248,11 @@
mk-authorization-handler nil
cluster/mk-storm-cluster-state nil
nimbus/file-cache-map nil
+ nimbus/mk-blob-cache-map nil
+ nimbus/mk-bloblist-cache-map nil
uptime-computer nil
new-instance nil
mk-timer nil
- nimbus/mk-code-distributor nil
zk-leader-elector nil
nimbus/mk-scheduler nil]
(nimbus/nimbus-data auth-conf fake-inimbus)
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj b/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
index ea45ddc..18d4ada 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
@@ -68,5 +68,6 @@
(.setSubject rc s)
(is (not (nil? (.principal rc))))
(is (= (-> rc .principal .getName) principal-name))
+ (.setSubject rc nil)
)
)
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 04c8600..776ad6e 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -324,7 +324,8 @@
set-worker-user! nil
supervisor/jlp nil
worker-artifacts-root "/tmp/workers-artifacts"
- supervisor/write-log-metadata! nil]
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@ -346,8 +347,9 @@
launch-process nil
set-worker-user! nil
supervisor/jlp nil
- worker-artifacts-root "/tmp/workers-artifacts"
- supervisor/write-log-metadata! nil]
+ supervisor/write-log-metadata! nil
+ supervisor/create-blobstore-links nil
+ worker-artifacts-root "/tmp/workers-artifacts"]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@ -367,7 +369,8 @@
set-worker-user! nil
supervisor/write-log-metadata! nil
launch-process nil
- current-classpath (str file-path-separator "base")]
+ current-classpath (str file-path-separator "base")
+ supervisor/create-blobstore-links nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@ -388,7 +391,8 @@
launch-process nil
set-worker-user! nil
supervisor/write-log-metadata! nil
- current-classpath (str file-path-separator "base")]
+ current-classpath (str file-path-separator "base")
+ supervisor/create-blobstore-links nil]
(supervisor/launch-worker mock-supervisor
mock-storm-id
mock-port
@@ -540,8 +544,8 @@
cluster/mk-storm-cluster-state nil
supervisor-state nil
local-hostname nil
- supervisor/mk-code-distributor nil
- mk-timer nil]
+ mk-timer nil
+ supervisor-local-dir nil]
(supervisor/supervisor-data auth-conf nil fake-isupervisor)
(verify-call-times-for cluster/mk-storm-cluster-state 1)
(verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java
new file mode 100644
index 0000000..388b491
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.SettableBlobMeta;
+
+import backtype.storm.security.auth.NimbusPrincipal;
+import backtype.storm.security.auth.SingleUserPrincipal;
+import backtype.storm.utils.Utils;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+public class BlobStoreTest {
+ private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
+ URI base;
+ File baseFile;
+ private static Map conf = new HashMap();
+ public static final int READ = 0x01;
+ public static final int WRITE = 0x02;
+ public static final int ADMIN = 0x04;
+
+ @Before
+ public void init() {
+ initializeConfigs();
+ baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID());
+ base = baseFile.toURI();
+ }
+
+ @After
+ public void cleanup() throws IOException {
+ FileUtils.deleteDirectory(baseFile);
+ }
+
+ // Method which initializes nimbus admin
+ public static void initializeConfigs() {
+ conf.put(Config.NIMBUS_ADMINS,"admin");
+ conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
+ }
+
+ // Gets Nimbus Subject with NimbusPrincipal set on it
+ public static Subject getNimbusSubject() {
+ Subject nimbus = new Subject();
+ nimbus.getPrincipals().add(new NimbusPrincipal());
+ return nimbus;
+ }
+
+ // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization
+ public static void assertStoreHasExactly(BlobStore store, Subject who, String ... keys)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ Set<String> expected = new HashSet<String>(Arrays.asList(keys));
+ Set<String> found = new HashSet<String>();
+ Iterator<String> c = store.listKeys();
+ while (c.hasNext()) {
+ String keyName = c.next();
+ found.add(keyName);
+ }
+ Set<String> extra = new HashSet<String>(found);
+ extra.removeAll(expected);
+ assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty());
+ Set<String> missing = new HashSet<String>(expected);
+ missing.removeAll(found);
+ assertTrue("Found keys missing from the blob store "+missing, missing.isEmpty());
+ }
+
+ public static void assertStoreHasExactly(BlobStore store, String ... keys)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ assertStoreHasExactly(store, null, keys);
+ }
+
+ // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on)
+ public static int readInt(BlobStore store, Subject who, String key)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ InputStream in = store.getBlob(key, who);
+ try {
+ return in.read();
+ } finally {
+ in.close();
+ }
+ }
+
+ public static int readInt(BlobStore store, String key)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ return readInt(store, null, key);
+ }
+
+ public static void readAssertEquals(BlobStore store, String key, int value)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ assertEquals(value, readInt(store, key));
+ }
+
+ // Checks for assertion when we turn on security
+ public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value)
+ throws IOException, KeyNotFoundException, AuthorizationException {
+ assertEquals(value, readInt(store, who, key));
+ }
+
+ private LocalFsBlobStore initLocalFs() {
+ LocalFsBlobStore store = new LocalFsBlobStore();
+ // Spy object that tries to mock the real object store
+ LocalFsBlobStore spy = spy(store);
+ Mockito.doNothing().when(spy).checkForBlobUpdate("test");
+ Mockito.doNothing().when(spy).checkForBlobUpdate("other");
+ Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-WE");
+ Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-DEF");
+ Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-acls");
+ Map conf = Utils.readStormConfig();
+ conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
+ conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal");
+ ArrayList<String> zookeeper_list = new ArrayList<>();
+ spy.prepare(conf, null, null);
+ return spy;
+ }
+
+ @Test
+ public void testLocalFsWithAuth() throws Exception {
+ testWithAuthentication(initLocalFs());
+ }
+
+ @Test
+ public void testBasicLocalFs() throws Exception {
+ testBasic(initLocalFs());
+ }
+
+ @Test
+ public void testMultipleLocalFs() throws Exception {
+ testMultiple(initLocalFs());
+ }
+
+ public Subject getSubject(String name) {
+ Subject subject = new Subject();
+ SingleUserPrincipal user = new SingleUserPrincipal(name);
+ subject.getPrincipals().add(user);
+ return subject;
+ }
+
+ // Check for Blobstore with authentication
+ public void testWithAuthentication(BlobStore store) throws Exception {
+ //Test for Nimbus Admin
+ Subject admin = getSubject("admin");
+ assertStoreHasExactly(store);
+ SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ AtomicOutputStream out = store.createBlob("test", metadata, admin);
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ out.close();
+ store.deleteBlob("test", admin);
+
+ //Test for Supervisor Admin
+ Subject supervisor = getSubject("supervisor");
+ assertStoreHasExactly(store);
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ out = store.createBlob("test", metadata, supervisor);
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ out.close();
+ store.deleteBlob("test", supervisor);
+
+ //Test for Nimbus itself as a user
+ Subject nimbus = getNimbusSubject();
+ assertStoreHasExactly(store);
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ out = store.createBlob("test", metadata, nimbus);
+ assertStoreHasExactly(store, "test");
+ out.write(1);
+ out.close();
+ store.deleteBlob("test", nimbus);
+
+ // Test with a dummy test_subject for cases where subject !=null (security turned on)
+ Subject who = getSubject("test_subject");
+ assertStoreHasExactly(store);
+
+ // Tests for case when subject != null (security turned on) and
+ // acls for the blob are set to WORLD_EVERYTHING
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ out = store.createBlob("test", metadata, who);
+ out.write(1);
+ out.close();
+ assertStoreHasExactly(store, "test");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test", 1);
+
+ LOG.info("Deleting test");
+ store.deleteBlob("test", who);
+ assertStoreHasExactly(store);
+
+ // Tests for case when subject != null (security turned on) and
+ // acls are not set for the blob (DEFAULT)
+ LOG.info("Creating test again");
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ out = store.createBlob("test", metadata, who);
+ out.write(2);
+ out.close();
+ assertStoreHasExactly(store, "test");
+ // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
+ // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
+ // complete access to the blob
+ assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test", 2);
+
+ LOG.info("Updating test");
+ out = store.updateBlob("test", who);
+ out.write(3);
+ out.close();
+ assertStoreHasExactly(store, "test");
+ readAssertEqualsWithAuth(store, who, "test", 3);
+
+ LOG.info("Updating test again");
+ out = store.updateBlob("test", who);
+ out.write(4);
+ out.flush();
+ LOG.info("SLEEPING");
+ Thread.sleep(2);
+ assertStoreHasExactly(store, "test");
+ readAssertEqualsWithAuth(store, who, "test", 3);
+
+ // Test for subject with no principals and acls set to WORLD_EVERYTHING
+ who = new Subject();
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ LOG.info("Creating test");
+ out = store.createBlob("test-empty-subject-WE", metadata, who);
+ out.write(2);
+ out.close();
+ assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+ // Test for subject with no principals and acls set to DEFAULT
+ who = new Subject();
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ LOG.info("Creating other");
+ out = store.createBlob("test-empty-subject-DEF", metadata, who);
+ out.write(2);
+ out.close();
+ assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+ if (store instanceof LocalFsBlobStore) {
+ ((LocalFsBlobStore) store).fullCleanup(1);
+ } else {
+ fail("Error the blobstore is of unknowntype");
+ }
+ try {
+ out.close();
+ } catch (IOException e) {
+ // This is likely to happen when we try to commit something that
+ // was cleaned up. This is expected and acceptable.
+ }
+ }
+
+ public void testBasic(BlobStore store) throws Exception {
+ assertStoreHasExactly(store);
+ LOG.info("Creating test");
+ // Tests for case when subject == null (security turned off) and
+ // acls for the blob are set to WORLD_EVERYTHING
+ SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
+ .WORLD_EVERYTHING);
+ AtomicOutputStream out = store.createBlob("test", metadata, null);
+ out.write(1);
+ out.close();
+ assertStoreHasExactly(store, "test");
+ // Testing whether acls are set to WORLD_EVERYTHING
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ readAssertEquals(store, "test", 1);
+
+ LOG.info("Deleting test");
+ store.deleteBlob("test", null);
+ assertStoreHasExactly(store);
+
+ // The following tests are run for both hdfs and local store to test the
+ // update blob interface
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+ LOG.info("Creating test again");
+ out = store.createBlob("test", metadata, null);
+ out.write(2);
+ out.close();
+ assertStoreHasExactly(store, "test");
+ if (store instanceof LocalFsBlobStore) {
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+ }
+ readAssertEquals(store, "test", 2);
+ LOG.info("Updating test");
+ out = store.updateBlob("test", null);
+ out.write(3);
+ out.close();
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 3);
+
+ LOG.info("Updating test again");
+ out = store.updateBlob("test", null);
+ out.write(4);
+ out.flush();
+ LOG.info("SLEEPING");
+ Thread.sleep(2);
+
+ // Tests for case when subject == null (security turned off) and
+ // acls for the blob are set to DEFAULT (Empty ACL List) only for LocalFsBlobstore
+ if (store instanceof LocalFsBlobStore) {
+ metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+ LOG.info("Creating test for empty acls when security is off");
+ out = store.createBlob("test-empty-acls", metadata, null);
+ LOG.info("metadata {}", metadata);
+ out.write(2);
+ out.close();
+ assertStoreHasExactly(store, "test-empty-acls", "test");
+ // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing only for LocalFsBlobstore
+ // as the HdfsBlobstore gets the subject information of the local system user and behaves as it is
+ // always authenticated.
+ assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.get_acl().toString().contains("OTHER"));
+
+ LOG.info("Deleting test-empty-acls");
+ store.deleteBlob("test-empty-acls", null);
+ }
+
+ if (store instanceof LocalFsBlobStore) {
+ ((LocalFsBlobStore) store).fullCleanup(1);
+ } else {
+ fail("Error the blobstore is of unknowntype");
+ }
+ try {
+ out.close();
+ } catch (IOException e) {
+ // This is likely to happen when we try to commit something that
+ // was cleaned up. This is expected and acceptable.
+ }
+ }
+
+
+ public void testMultiple(BlobStore store) throws Exception {
+
+ assertStoreHasExactly(store);
+ LOG.info("Creating test");
+ AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler
+ .WORLD_EVERYTHING), null);
+ out.write(1);
+ out.close();
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 1);
+
+ LOG.info("Creating other");
+ out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+ null);
+ out.write(2);
+ out.close();
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 1);
+ readAssertEquals(store, "other", 2);
+
+ LOG.info("Updating other");
+ out = store.updateBlob("other", null);
+ out.write(5);
+ out.close();
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 1);
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Deleting test");
+ store.deleteBlob("test", null);
+ assertStoreHasExactly(store, "other");
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Creating test again");
+ out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+ null);
+ out.write(2);
+ out.close();
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 2);
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Updating test");
+ out = store.updateBlob("test", null);
+ out.write(3);
+ out.close();
+ assertStoreHasExactly(store, "test", "other");
+ readAssertEquals(store, "test", 3);
+ readAssertEquals(store, "other", 5);
+
+ LOG.info("Deleting other");
+ store.deleteBlob("other", null);
+ assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 3);
+
+ LOG.info("Updating test again");
+ out = store.updateBlob("test", null);
+ out.write(4);
+ out.flush();
+ LOG.info("SLEEPING");
+ Thread.sleep(2);
+
+ if (store instanceof LocalFsBlobStore) {
+ ((LocalFsBlobStore) store).fullCleanup(1);
+ } else {
+ fail("Error the blobstore is of unknowntype");
+ } assertStoreHasExactly(store, "test");
+ readAssertEquals(store, "test", 3);
+ try {
+ out.close();
+ } catch (IOException e) {
+ // This is likely to happen when we try to commit something that
+ // was cleaned up. This is expected and acceptable.
+ }
+ }
+
+ @Test
+ public void testGetFileLength()
+ throws AuthorizationException, KeyNotFoundException, KeyAlreadyExistsException, IOException {
+ LocalFsBlobStore store = initLocalFs();
+ AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler
+ .WORLD_EVERYTHING), null);
+ out.write(1);
+ out.close();
+ assertEquals(1, store.getBlob("test", null).getFileLength());
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java b/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java
new file mode 100644
index 0000000..63f633d
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Unit tests for most of the testable utility methods
+ * and BlobSynchronizer class methods
+ */
+public class BlobSynchronizerTest {
+ private URI base;
+ private File baseFile;
+ private static Map conf = new HashMap();
+ private NIOServerCnxnFactory factory;
+
+ @Before
+ public void init() throws Exception {
+ initializeConfigs();
+ baseFile = new File("/tmp/blob-store-test-"+ UUID.randomUUID());
+ base = baseFile.toURI();
+ }
+
+ @After
+ public void cleanUp() throws IOException {
+ FileUtils.deleteDirectory(baseFile);
+ if (factory != null) {
+ factory.shutdown();
+ }
+ }
+
+ // Method which initializes nimbus admin
+ public static void initializeConfigs() {
+ conf.put(Config.NIMBUS_ADMINS,"admin");
+ conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
+ }
+
+ private LocalFsBlobStore initLocalFs() {
+ LocalFsBlobStore store = new LocalFsBlobStore();
+ Map conf = Utils.readStormConfig();
+ conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
+ conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal");
+ this.conf = conf;
+ store.prepare(conf, null, null);
+ return store;
+ }
+
+ @Test
+ public void testBlobSynchronizerForKeysToDownload() {
+ BlobStore store = initLocalFs();
+ BlobSynchronizer sync = new BlobSynchronizer(store, conf);
+ // test for keylist to download
+ Set<String> zkSet = new HashSet<String>();
+ zkSet.add("key1");
+ Set<String> blobStoreSet = new HashSet<String>();
+ blobStoreSet.add("key1");
+ Set<String> resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
+ assertTrue("Not Empty", resultSet.isEmpty());
+ zkSet.add("key1");
+ blobStoreSet.add("key2");
+ resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
+ assertTrue("Not Empty", resultSet.isEmpty());
+ blobStoreSet.remove("key1");
+ blobStoreSet.remove("key2");
+ zkSet.add("key1");
+ resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
+ assertTrue("Unexpected keys to download", (resultSet.size() == 1) && (resultSet.contains("key1")));
+ }
+
+ @Test
+ public void testGetLatestSequenceNumber() throws Exception {
+ List<String> stateInfoList = new ArrayList<String>();
+ stateInfoList.add("nimbus1:8000-2");
+ stateInfoList.add("nimbus-1:8000-4");
+ assertTrue("Failed to get the latest version", BlobStoreUtils.getLatestSequenceNumber(stateInfoList)==4);
+ }
+
+ @Test
+ public void testNimbodesWithLatestVersionOfBlob() throws Exception {
+ TestingServer server = new TestingServer();
+ CuratorFramework zkClient = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+ zkClient.start();
+ // Creating nimbus hosts containing latest version of blob
+ zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
+ zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
+ Set<NimbusInfo> set = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, "key1");
+ assertEquals("Failed to get the correct nimbus hosts with latest blob version", (set.iterator().next()).getHost(),"nimbus2");
+ zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
+ zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
+ zkClient.close();
+ server.close();
+ }
+
+ @Test
+ public void testNormalizeVersionInfo () throws Exception {
+ BlobKeySequenceInfo info1 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus1:7800-1");
+ assertTrue(info1.getNimbusHostPort().equals("nimbus1:7800"));
+ assertTrue(info1.getSequenceNumber().equals("1"));
+ BlobKeySequenceInfo info2 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus-1:7800-1");
+ assertTrue(info2.getNimbusHostPort().equals("nimbus-1:7800"));
+ assertTrue(info2.getSequenceNumber().equals("1"));
+ }
+}