You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/12/04 16:03:57 UTC

[02/17] storm git commit: Blobstore API STORM- 876

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index a730c13..98a7ba4 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -62,6 +62,20 @@ class TopologyInitialStatus:
     "INACTIVE": 2,
   }
 
+class AccessControlType:
+  OTHER = 1
+  USER = 2
+
+  _VALUES_TO_NAMES = {
+    1: "OTHER",
+    2: "USER",
+  }
+
+  _NAMES_TO_VALUES = {
+    "OTHER": 1,
+    "USER": 2,
+  }
+
 class TopologyStatus:
   ACTIVE = 1
   INACTIVE = 2
@@ -1802,6 +1816,146 @@ class InvalidTopologyException(TException):
   def __ne__(self, other):
     return not (self == other)
 
+class KeyNotFoundException(TException):
+  """
+  Attributes:
+   - msg
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'msg', None, None, ), # 1
+  )
+
+  def __init__(self, msg=None,):
+    self.msg = msg
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.msg = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('KeyNotFoundException')
+    if self.msg is not None:
+      oprot.writeFieldBegin('msg', TType.STRING, 1)
+      oprot.writeString(self.msg.encode('utf-8'))
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.msg is None:
+      raise TProtocol.TProtocolException(message='Required field msg is unset!')
+    return
+
+
+  def __str__(self):
+    return repr(self)
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.msg)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class KeyAlreadyExistsException(TException):
+  """
+  Attributes:
+   - msg
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'msg', None, None, ), # 1
+  )
+
+  def __init__(self, msg=None,):
+    self.msg = msg
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.msg = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('KeyAlreadyExistsException')
+    if self.msg is not None:
+      oprot.writeFieldBegin('msg', TType.STRING, 1)
+      oprot.writeString(self.msg.encode('utf-8'))
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.msg is None:
+      raise TProtocol.TProtocolException(message='Required field msg is unset!')
+    return
+
+
+  def __str__(self):
+    return repr(self)
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.msg)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class TopologySummary:
   """
   Attributes:
@@ -7110,6 +7264,458 @@ class SubmitOptions:
   def __ne__(self, other):
     return not (self == other)
 
+class AccessControl:
+  """
+  Attributes:
+   - type
+   - name
+   - access
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.I32, 'type', None, None, ), # 1
+    (2, TType.STRING, 'name', None, None, ), # 2
+    (3, TType.I32, 'access', None, None, ), # 3
+  )
+
+  def __init__(self, type=None, name=None, access=None,):
+    self.type = type
+    self.name = name
+    self.access = access
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.I32:
+          self.type = iprot.readI32()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.name = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I32:
+          self.access = iprot.readI32()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('AccessControl')
+    if self.type is not None:
+      oprot.writeFieldBegin('type', TType.I32, 1)
+      oprot.writeI32(self.type)
+      oprot.writeFieldEnd()
+    if self.name is not None:
+      oprot.writeFieldBegin('name', TType.STRING, 2)
+      oprot.writeString(self.name.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.access is not None:
+      oprot.writeFieldBegin('access', TType.I32, 3)
+      oprot.writeI32(self.access)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.type is None:
+      raise TProtocol.TProtocolException(message='Required field type is unset!')
+    if self.access is None:
+      raise TProtocol.TProtocolException(message='Required field access is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.type)
+    value = (value * 31) ^ hash(self.name)
+    value = (value * 31) ^ hash(self.access)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class SettableBlobMeta:
+  """
+  Attributes:
+   - acl
+   - replication_factor
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.LIST, 'acl', (TType.STRUCT,(AccessControl, AccessControl.thrift_spec)), None, ), # 1
+    (2, TType.I32, 'replication_factor', None, None, ), # 2
+  )
+
+  def __init__(self, acl=None, replication_factor=None,):
+    self.acl = acl
+    self.replication_factor = replication_factor
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.LIST:
+          self.acl = []
+          (_etype440, _size437) = iprot.readListBegin()
+          for _i441 in xrange(_size437):
+            _elem442 = AccessControl()
+            _elem442.read(iprot)
+            self.acl.append(_elem442)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I32:
+          self.replication_factor = iprot.readI32()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('SettableBlobMeta')
+    if self.acl is not None:
+      oprot.writeFieldBegin('acl', TType.LIST, 1)
+      oprot.writeListBegin(TType.STRUCT, len(self.acl))
+      for iter443 in self.acl:
+        iter443.write(oprot)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    if self.replication_factor is not None:
+      oprot.writeFieldBegin('replication_factor', TType.I32, 2)
+      oprot.writeI32(self.replication_factor)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.acl is None:
+      raise TProtocol.TProtocolException(message='Required field acl is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.acl)
+    value = (value * 31) ^ hash(self.replication_factor)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class ReadableBlobMeta:
+  """
+  Attributes:
+   - settable
+   - version
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRUCT, 'settable', (SettableBlobMeta, SettableBlobMeta.thrift_spec), None, ), # 1
+    (2, TType.I64, 'version', None, None, ), # 2
+  )
+
+  def __init__(self, settable=None, version=None,):
+    self.settable = settable
+    self.version = version
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRUCT:
+          self.settable = SettableBlobMeta()
+          self.settable.read(iprot)
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I64:
+          self.version = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('ReadableBlobMeta')
+    if self.settable is not None:
+      oprot.writeFieldBegin('settable', TType.STRUCT, 1)
+      self.settable.write(oprot)
+      oprot.writeFieldEnd()
+    if self.version is not None:
+      oprot.writeFieldBegin('version', TType.I64, 2)
+      oprot.writeI64(self.version)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.settable is None:
+      raise TProtocol.TProtocolException(message='Required field settable is unset!')
+    if self.version is None:
+      raise TProtocol.TProtocolException(message='Required field version is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.settable)
+    value = (value * 31) ^ hash(self.version)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class ListBlobsResult:
+  """
+  Attributes:
+   - keys
+   - session
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.LIST, 'keys', (TType.STRING,None), None, ), # 1
+    (2, TType.STRING, 'session', None, None, ), # 2
+  )
+
+  def __init__(self, keys=None, session=None,):
+    self.keys = keys
+    self.session = session
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.LIST:
+          self.keys = []
+          (_etype447, _size444) = iprot.readListBegin()
+          for _i448 in xrange(_size444):
+            _elem449 = iprot.readString().decode('utf-8')
+            self.keys.append(_elem449)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.session = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('ListBlobsResult')
+    if self.keys is not None:
+      oprot.writeFieldBegin('keys', TType.LIST, 1)
+      oprot.writeListBegin(TType.STRING, len(self.keys))
+      for iter450 in self.keys:
+        oprot.writeString(iter450.encode('utf-8'))
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    if self.session is not None:
+      oprot.writeFieldBegin('session', TType.STRING, 2)
+      oprot.writeString(self.session.encode('utf-8'))
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.keys is None:
+      raise TProtocol.TProtocolException(message='Required field keys is unset!')
+    if self.session is None:
+      raise TProtocol.TProtocolException(message='Required field session is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.keys)
+    value = (value * 31) ^ hash(self.session)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class BeginDownloadResult:
+  """
+  Attributes:
+   - version
+   - session
+   - data_size
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.I64, 'version', None, None, ), # 1
+    (2, TType.STRING, 'session', None, None, ), # 2
+    (3, TType.I64, 'data_size', None, None, ), # 3
+  )
+
+  def __init__(self, version=None, session=None, data_size=None,):
+    self.version = version
+    self.session = session
+    self.data_size = data_size
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.I64:
+          self.version = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.STRING:
+          self.session = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I64:
+          self.data_size = iprot.readI64()
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('BeginDownloadResult')
+    if self.version is not None:
+      oprot.writeFieldBegin('version', TType.I64, 1)
+      oprot.writeI64(self.version)
+      oprot.writeFieldEnd()
+    if self.session is not None:
+      oprot.writeFieldBegin('session', TType.STRING, 2)
+      oprot.writeString(self.session.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.data_size is not None:
+      oprot.writeFieldBegin('data_size', TType.I64, 3)
+      oprot.writeI64(self.data_size)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.version is None:
+      raise TProtocol.TProtocolException(message='Required field version is unset!')
+    if self.session is None:
+      raise TProtocol.TProtocolException(message='Required field session is unset!')
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.version)
+    value = (value * 31) ^ hash(self.session)
+    value = (value * 31) ^ hash(self.data_size)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class SupervisorInfo:
   """
   Attributes:
@@ -7175,31 +7781,31 @@ class SupervisorInfo:
       elif fid == 4:
         if ftype == TType.LIST:
           self.used_ports = []
-          (_etype440, _size437) = iprot.readListBegin()
-          for _i441 in xrange(_size437):
-            _elem442 = iprot.readI64()
-            self.used_ports.append(_elem442)
+          (_etype454, _size451) = iprot.readListBegin()
+          for _i455 in xrange(_size451):
+            _elem456 = iprot.readI64()
+            self.used_ports.append(_elem456)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.LIST:
           self.meta = []
-          (_etype446, _size443) = iprot.readListBegin()
-          for _i447 in xrange(_size443):
-            _elem448 = iprot.readI64()
-            self.meta.append(_elem448)
+          (_etype460, _size457) = iprot.readListBegin()
+          for _i461 in xrange(_size457):
+            _elem462 = iprot.readI64()
+            self.meta.append(_elem462)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 6:
         if ftype == TType.MAP:
           self.scheduler_meta = {}
-          (_ktype450, _vtype451, _size449 ) = iprot.readMapBegin()
-          for _i453 in xrange(_size449):
-            _key454 = iprot.readString().decode('utf-8')
-            _val455 = iprot.readString().decode('utf-8')
-            self.scheduler_meta[_key454] = _val455
+          (_ktype464, _vtype465, _size463 ) = iprot.readMapBegin()
+          for _i467 in xrange(_size463):
+            _key468 = iprot.readString().decode('utf-8')
+            _val469 = iprot.readString().decode('utf-8')
+            self.scheduler_meta[_key468] = _val469
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -7216,11 +7822,11 @@ class SupervisorInfo:
       elif fid == 9:
         if ftype == TType.MAP:
           self.resources_map = {}
-          (_ktype457, _vtype458, _size456 ) = iprot.readMapBegin()
-          for _i460 in xrange(_size456):
-            _key461 = iprot.readString().decode('utf-8')
-            _val462 = iprot.readDouble()
-            self.resources_map[_key461] = _val462
+          (_ktype471, _vtype472, _size470 ) = iprot.readMapBegin()
+          for _i474 in xrange(_size470):
+            _key475 = iprot.readString().decode('utf-8')
+            _val476 = iprot.readDouble()
+            self.resources_map[_key475] = _val476
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -7249,23 +7855,23 @@ class SupervisorInfo:
     if self.used_ports is not None:
       oprot.writeFieldBegin('used_ports', TType.LIST, 4)
       oprot.writeListBegin(TType.I64, len(self.used_ports))
-      for iter463 in self.used_ports:
-        oprot.writeI64(iter463)
+      for iter477 in self.used_ports:
+        oprot.writeI64(iter477)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.meta is not None:
       oprot.writeFieldBegin('meta', TType.LIST, 5)
       oprot.writeListBegin(TType.I64, len(self.meta))
-      for iter464 in self.meta:
-        oprot.writeI64(iter464)
+      for iter478 in self.meta:
+        oprot.writeI64(iter478)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.scheduler_meta is not None:
       oprot.writeFieldBegin('scheduler_meta', TType.MAP, 6)
       oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.scheduler_meta))
-      for kiter465,viter466 in self.scheduler_meta.items():
-        oprot.writeString(kiter465.encode('utf-8'))
-        oprot.writeString(viter466.encode('utf-8'))
+      for kiter479,viter480 in self.scheduler_meta.items():
+        oprot.writeString(kiter479.encode('utf-8'))
+        oprot.writeString(viter480.encode('utf-8'))
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.uptime_secs is not None:
@@ -7279,9 +7885,9 @@ class SupervisorInfo:
     if self.resources_map is not None:
       oprot.writeFieldBegin('resources_map', TType.MAP, 9)
       oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(self.resources_map))
-      for kiter467,viter468 in self.resources_map.items():
-        oprot.writeString(kiter467.encode('utf-8'))
-        oprot.writeDouble(viter468)
+      for kiter481,viter482 in self.resources_map.items():
+        oprot.writeString(kiter481.encode('utf-8'))
+        oprot.writeDouble(viter482)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -7353,10 +7959,10 @@ class NodeInfo:
       elif fid == 2:
         if ftype == TType.SET:
           self.port = set()
-          (_etype472, _size469) = iprot.readSetBegin()
-          for _i473 in xrange(_size469):
-            _elem474 = iprot.readI64()
-            self.port.add(_elem474)
+          (_etype486, _size483) = iprot.readSetBegin()
+          for _i487 in xrange(_size483):
+            _elem488 = iprot.readI64()
+            self.port.add(_elem488)
           iprot.readSetEnd()
         else:
           iprot.skip(ftype)
@@ -7377,8 +7983,8 @@ class NodeInfo:
     if self.port is not None:
       oprot.writeFieldBegin('port', TType.SET, 2)
       oprot.writeSetBegin(TType.I64, len(self.port))
-      for iter475 in self.port:
-        oprot.writeI64(iter475)
+      for iter489 in self.port:
+        oprot.writeI64(iter489)
       oprot.writeSetEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -7559,57 +8165,57 @@ class Assignment:
       elif fid == 2:
         if ftype == TType.MAP:
           self.node_host = {}
-          (_ktype477, _vtype478, _size476 ) = iprot.readMapBegin()
-          for _i480 in xrange(_size476):
-            _key481 = iprot.readString().decode('utf-8')
-            _val482 = iprot.readString().decode('utf-8')
-            self.node_host[_key481] = _val482
+          (_ktype491, _vtype492, _size490 ) = iprot.readMapBegin()
+          for _i494 in xrange(_size490):
+            _key495 = iprot.readString().decode('utf-8')
+            _val496 = iprot.readString().decode('utf-8')
+            self.node_host[_key495] = _val496
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.MAP:
           self.executor_node_port = {}
-          (_ktype484, _vtype485, _size483 ) = iprot.readMapBegin()
-          for _i487 in xrange(_size483):
-            _key488 = []
-            (_etype493, _size490) = iprot.readListBegin()
-            for _i494 in xrange(_size490):
-              _elem495 = iprot.readI64()
-              _key488.append(_elem495)
+          (_ktype498, _vtype499, _size497 ) = iprot.readMapBegin()
+          for _i501 in xrange(_size497):
+            _key502 = []
+            (_etype507, _size504) = iprot.readListBegin()
+            for _i508 in xrange(_size504):
+              _elem509 = iprot.readI64()
+              _key502.append(_elem509)
             iprot.readListEnd()
-            _val489 = NodeInfo()
-            _val489.read(iprot)
-            self.executor_node_port[_key488] = _val489
+            _val503 = NodeInfo()
+            _val503.read(iprot)
+            self.executor_node_port[_key502] = _val503
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.MAP:
           self.executor_start_time_secs = {}
-          (_ktype497, _vtype498, _size496 ) = iprot.readMapBegin()
-          for _i500 in xrange(_size496):
-            _key501 = []
-            (_etype506, _size503) = iprot.readListBegin()
-            for _i507 in xrange(_size503):
-              _elem508 = iprot.readI64()
-              _key501.append(_elem508)
+          (_ktype511, _vtype512, _size510 ) = iprot.readMapBegin()
+          for _i514 in xrange(_size510):
+            _key515 = []
+            (_etype520, _size517) = iprot.readListBegin()
+            for _i521 in xrange(_size517):
+              _elem522 = iprot.readI64()
+              _key515.append(_elem522)
             iprot.readListEnd()
-            _val502 = iprot.readI64()
-            self.executor_start_time_secs[_key501] = _val502
+            _val516 = iprot.readI64()
+            self.executor_start_time_secs[_key515] = _val516
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.MAP:
           self.worker_resources = {}
-          (_ktype510, _vtype511, _size509 ) = iprot.readMapBegin()
-          for _i513 in xrange(_size509):
-            _key514 = NodeInfo()
-            _key514.read(iprot)
-            _val515 = WorkerResources()
-            _val515.read(iprot)
-            self.worker_resources[_key514] = _val515
+          (_ktype524, _vtype525, _size523 ) = iprot.readMapBegin()
+          for _i527 in xrange(_size523):
+            _key528 = NodeInfo()
+            _key528.read(iprot)
+            _val529 = WorkerResources()
+            _val529.read(iprot)
+            self.worker_resources[_key528] = _val529
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -7630,39 +8236,39 @@ class Assignment:
     if self.node_host is not None:
       oprot.writeFieldBegin('node_host', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.node_host))
-      for kiter516,viter517 in self.node_host.items():
-        oprot.writeString(kiter516.encode('utf-8'))
-        oprot.writeString(viter517.encode('utf-8'))
+      for kiter530,viter531 in self.node_host.items():
+        oprot.writeString(kiter530.encode('utf-8'))
+        oprot.writeString(viter531.encode('utf-8'))
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.executor_node_port is not None:
       oprot.writeFieldBegin('executor_node_port', TType.MAP, 3)
       oprot.writeMapBegin(TType.LIST, TType.STRUCT, len(self.executor_node_port))
-      for kiter518,viter519 in self.executor_node_port.items():
-        oprot.writeListBegin(TType.I64, len(kiter518))
-        for iter520 in kiter518:
-          oprot.writeI64(iter520)
+      for kiter532,viter533 in self.executor_node_port.items():
+        oprot.writeListBegin(TType.I64, len(kiter532))
+        for iter534 in kiter532:
+          oprot.writeI64(iter534)
         oprot.writeListEnd()
-        viter519.write(oprot)
+        viter533.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.executor_start_time_secs is not None:
       oprot.writeFieldBegin('executor_start_time_secs', TType.MAP, 4)
       oprot.writeMapBegin(TType.LIST, TType.I64, len(self.executor_start_time_secs))
-      for kiter521,viter522 in self.executor_start_time_secs.items():
-        oprot.writeListBegin(TType.I64, len(kiter521))
-        for iter523 in kiter521:
-          oprot.writeI64(iter523)
+      for kiter535,viter536 in self.executor_start_time_secs.items():
+        oprot.writeListBegin(TType.I64, len(kiter535))
+        for iter537 in kiter535:
+          oprot.writeI64(iter537)
         oprot.writeListEnd()
-        oprot.writeI64(viter522)
+        oprot.writeI64(viter536)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.worker_resources is not None:
       oprot.writeFieldBegin('worker_resources', TType.MAP, 5)
       oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.worker_resources))
-      for kiter524,viter525 in self.worker_resources.items():
-        kiter524.write(oprot)
-        viter525.write(oprot)
+      for kiter538,viter539 in self.worker_resources.items():
+        kiter538.write(oprot)
+        viter539.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -7839,11 +8445,11 @@ class StormBase:
       elif fid == 4:
         if ftype == TType.MAP:
           self.component_executors = {}
-          (_ktype527, _vtype528, _size526 ) = iprot.readMapBegin()
-          for _i530 in xrange(_size526):
-            _key531 = iprot.readString().decode('utf-8')
-            _val532 = iprot.readI32()
-            self.component_executors[_key531] = _val532
+          (_ktype541, _vtype542, _size540 ) = iprot.readMapBegin()
+          for _i544 in xrange(_size540):
+            _key545 = iprot.readString().decode('utf-8')
+            _val546 = iprot.readI32()
+            self.component_executors[_key545] = _val546
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -7871,12 +8477,12 @@ class StormBase:
       elif fid == 9:
         if ftype == TType.MAP:
           self.component_debug = {}
-          (_ktype534, _vtype535, _size533 ) = iprot.readMapBegin()
-          for _i537 in xrange(_size533):
-            _key538 = iprot.readString().decode('utf-8')
-            _val539 = DebugOptions()
-            _val539.read(iprot)
-            self.component_debug[_key538] = _val539
+          (_ktype548, _vtype549, _size547 ) = iprot.readMapBegin()
+          for _i551 in xrange(_size547):
+            _key552 = iprot.readString().decode('utf-8')
+            _val553 = DebugOptions()
+            _val553.read(iprot)
+            self.component_debug[_key552] = _val553
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -7905,9 +8511,9 @@ class StormBase:
     if self.component_executors is not None:
       oprot.writeFieldBegin('component_executors', TType.MAP, 4)
       oprot.writeMapBegin(TType.STRING, TType.I32, len(self.component_executors))
-      for kiter540,viter541 in self.component_executors.items():
-        oprot.writeString(kiter540.encode('utf-8'))
-        oprot.writeI32(viter541)
+      for kiter554,viter555 in self.component_executors.items():
+        oprot.writeString(kiter554.encode('utf-8'))
+        oprot.writeI32(viter555)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.launch_time_secs is not None:
@@ -7929,9 +8535,9 @@ class StormBase:
     if self.component_debug is not None:
       oprot.writeFieldBegin('component_debug', TType.MAP, 9)
       oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.component_debug))
-      for kiter542,viter543 in self.component_debug.items():
-        oprot.writeString(kiter542.encode('utf-8'))
-        viter543.write(oprot)
+      for kiter556,viter557 in self.component_debug.items():
+        oprot.writeString(kiter556.encode('utf-8'))
+        viter557.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -8011,13 +8617,13 @@ class ClusterWorkerHeartbeat:
       elif fid == 2:
         if ftype == TType.MAP:
           self.executor_stats = {}
-          (_ktype545, _vtype546, _size544 ) = iprot.readMapBegin()
-          for _i548 in xrange(_size544):
-            _key549 = ExecutorInfo()
-            _key549.read(iprot)
-            _val550 = ExecutorStats()
-            _val550.read(iprot)
-            self.executor_stats[_key549] = _val550
+          (_ktype559, _vtype560, _size558 ) = iprot.readMapBegin()
+          for _i562 in xrange(_size558):
+            _key563 = ExecutorInfo()
+            _key563.read(iprot)
+            _val564 = ExecutorStats()
+            _val564.read(iprot)
+            self.executor_stats[_key563] = _val564
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -8048,9 +8654,9 @@ class ClusterWorkerHeartbeat:
     if self.executor_stats is not None:
       oprot.writeFieldBegin('executor_stats', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRUCT, TType.STRUCT, len(self.executor_stats))
-      for kiter551,viter552 in self.executor_stats.items():
-        kiter551.write(oprot)
-        viter552.write(oprot)
+      for kiter565,viter566 in self.executor_stats.items():
+        kiter565.write(oprot)
+        viter566.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.time_secs is not None:
@@ -8203,12 +8809,12 @@ class LocalStateData:
       if fid == 1:
         if ftype == TType.MAP:
           self.serialized_parts = {}
-          (_ktype554, _vtype555, _size553 ) = iprot.readMapBegin()
-          for _i557 in xrange(_size553):
-            _key558 = iprot.readString().decode('utf-8')
-            _val559 = ThriftSerializedObject()
-            _val559.read(iprot)
-            self.serialized_parts[_key558] = _val559
+          (_ktype568, _vtype569, _size567 ) = iprot.readMapBegin()
+          for _i571 in xrange(_size567):
+            _key572 = iprot.readString().decode('utf-8')
+            _val573 = ThriftSerializedObject()
+            _val573.read(iprot)
+            self.serialized_parts[_key572] = _val573
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -8225,9 +8831,9 @@ class LocalStateData:
     if self.serialized_parts is not None:
       oprot.writeFieldBegin('serialized_parts', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.serialized_parts))
-      for kiter560,viter561 in self.serialized_parts.items():
-        oprot.writeString(kiter560.encode('utf-8'))
-        viter561.write(oprot)
+      for kiter574,viter575 in self.serialized_parts.items():
+        oprot.writeString(kiter574.encode('utf-8'))
+        viter575.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -8292,11 +8898,11 @@ class LocalAssignment:
       elif fid == 2:
         if ftype == TType.LIST:
           self.executors = []
-          (_etype565, _size562) = iprot.readListBegin()
-          for _i566 in xrange(_size562):
-            _elem567 = ExecutorInfo()
-            _elem567.read(iprot)
-            self.executors.append(_elem567)
+          (_etype579, _size576) = iprot.readListBegin()
+          for _i580 in xrange(_size576):
+            _elem581 = ExecutorInfo()
+            _elem581.read(iprot)
+            self.executors.append(_elem581)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -8323,8 +8929,8 @@ class LocalAssignment:
     if self.executors is not None:
       oprot.writeFieldBegin('executors', TType.LIST, 2)
       oprot.writeListBegin(TType.STRUCT, len(self.executors))
-      for iter568 in self.executors:
-        iter568.write(oprot)
+      for iter582 in self.executors:
+        iter582.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.resources is not None:
@@ -8453,11 +9059,11 @@ class LSApprovedWorkers:
       if fid == 1:
         if ftype == TType.MAP:
           self.approved_workers = {}
-          (_ktype570, _vtype571, _size569 ) = iprot.readMapBegin()
-          for _i573 in xrange(_size569):
-            _key574 = iprot.readString().decode('utf-8')
-            _val575 = iprot.readI32()
-            self.approved_workers[_key574] = _val575
+          (_ktype584, _vtype585, _size583 ) = iprot.readMapBegin()
+          for _i587 in xrange(_size583):
+            _key588 = iprot.readString().decode('utf-8')
+            _val589 = iprot.readI32()
+            self.approved_workers[_key588] = _val589
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -8474,9 +9080,9 @@ class LSApprovedWorkers:
     if self.approved_workers is not None:
       oprot.writeFieldBegin('approved_workers', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.I32, len(self.approved_workers))
-      for kiter576,viter577 in self.approved_workers.items():
-        oprot.writeString(kiter576.encode('utf-8'))
-        oprot.writeI32(viter577)
+      for kiter590,viter591 in self.approved_workers.items():
+        oprot.writeString(kiter590.encode('utf-8'))
+        oprot.writeI32(viter591)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -8530,12 +9136,12 @@ class LSSupervisorAssignments:
       if fid == 1:
         if ftype == TType.MAP:
           self.assignments = {}
-          (_ktype579, _vtype580, _size578 ) = iprot.readMapBegin()
-          for _i582 in xrange(_size578):
-            _key583 = iprot.readI32()
-            _val584 = LocalAssignment()
-            _val584.read(iprot)
-            self.assignments[_key583] = _val584
+          (_ktype593, _vtype594, _size592 ) = iprot.readMapBegin()
+          for _i596 in xrange(_size592):
+            _key597 = iprot.readI32()
+            _val598 = LocalAssignment()
+            _val598.read(iprot)
+            self.assignments[_key597] = _val598
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -8552,9 +9158,9 @@ class LSSupervisorAssignments:
     if self.assignments is not None:
       oprot.writeFieldBegin('assignments', TType.MAP, 1)
       oprot.writeMapBegin(TType.I32, TType.STRUCT, len(self.assignments))
-      for kiter585,viter586 in self.assignments.items():
-        oprot.writeI32(kiter585)
-        viter586.write(oprot)
+      for kiter599,viter600 in self.assignments.items():
+        oprot.writeI32(kiter599)
+        viter600.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -8627,11 +9233,11 @@ class LSWorkerHeartbeat:
       elif fid == 3:
         if ftype == TType.LIST:
           self.executors = []
-          (_etype590, _size587) = iprot.readListBegin()
-          for _i591 in xrange(_size587):
-            _elem592 = ExecutorInfo()
-            _elem592.read(iprot)
-            self.executors.append(_elem592)
+          (_etype604, _size601) = iprot.readListBegin()
+          for _i605 in xrange(_size601):
+            _elem606 = ExecutorInfo()
+            _elem606.read(iprot)
+            self.executors.append(_elem606)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -8661,8 +9267,8 @@ class LSWorkerHeartbeat:
     if self.executors is not None:
       oprot.writeFieldBegin('executors', TType.LIST, 3)
       oprot.writeListBegin(TType.STRUCT, len(self.executors))
-      for iter593 in self.executors:
-        iter593.write(oprot)
+      for iter607 in self.executors:
+        iter607.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.port is not None:
@@ -8748,20 +9354,20 @@ class LSTopoHistory:
       elif fid == 3:
         if ftype == TType.LIST:
           self.users = []
-          (_etype597, _size594) = iprot.readListBegin()
-          for _i598 in xrange(_size594):
-            _elem599 = iprot.readString().decode('utf-8')
-            self.users.append(_elem599)
+          (_etype611, _size608) = iprot.readListBegin()
+          for _i612 in xrange(_size608):
+            _elem613 = iprot.readString().decode('utf-8')
+            self.users.append(_elem613)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.LIST:
           self.groups = []
-          (_etype603, _size600) = iprot.readListBegin()
-          for _i604 in xrange(_size600):
-            _elem605 = iprot.readString().decode('utf-8')
-            self.groups.append(_elem605)
+          (_etype617, _size614) = iprot.readListBegin()
+          for _i618 in xrange(_size614):
+            _elem619 = iprot.readString().decode('utf-8')
+            self.groups.append(_elem619)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -8786,15 +9392,15 @@ class LSTopoHistory:
     if self.users is not None:
       oprot.writeFieldBegin('users', TType.LIST, 3)
       oprot.writeListBegin(TType.STRING, len(self.users))
-      for iter606 in self.users:
-        oprot.writeString(iter606.encode('utf-8'))
+      for iter620 in self.users:
+        oprot.writeString(iter620.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.groups is not None:
       oprot.writeFieldBegin('groups', TType.LIST, 4)
       oprot.writeListBegin(TType.STRING, len(self.groups))
-      for iter607 in self.groups:
-        oprot.writeString(iter607.encode('utf-8'))
+      for iter621 in self.groups:
+        oprot.writeString(iter621.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -8857,11 +9463,11 @@ class LSTopoHistoryList:
       if fid == 1:
         if ftype == TType.LIST:
           self.topo_history = []
-          (_etype611, _size608) = iprot.readListBegin()
-          for _i612 in xrange(_size608):
-            _elem613 = LSTopoHistory()
-            _elem613.read(iprot)
-            self.topo_history.append(_elem613)
+          (_etype625, _size622) = iprot.readListBegin()
+          for _i626 in xrange(_size622):
+            _elem627 = LSTopoHistory()
+            _elem627.read(iprot)
+            self.topo_history.append(_elem627)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -8878,8 +9484,8 @@ class LSTopoHistoryList:
     if self.topo_history is not None:
       oprot.writeFieldBegin('topo_history', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.topo_history))
-      for iter614 in self.topo_history:
-        iter614.write(oprot)
+      for iter628 in self.topo_history:
+        iter628.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -9214,12 +9820,12 @@ class LogConfig:
       if fid == 2:
         if ftype == TType.MAP:
           self.named_logger_level = {}
-          (_ktype616, _vtype617, _size615 ) = iprot.readMapBegin()
-          for _i619 in xrange(_size615):
-            _key620 = iprot.readString().decode('utf-8')
-            _val621 = LogLevel()
-            _val621.read(iprot)
-            self.named_logger_level[_key620] = _val621
+          (_ktype630, _vtype631, _size629 ) = iprot.readMapBegin()
+          for _i633 in xrange(_size629):
+            _key634 = iprot.readString().decode('utf-8')
+            _val635 = LogLevel()
+            _val635.read(iprot)
+            self.named_logger_level[_key634] = _val635
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -9236,9 +9842,9 @@ class LogConfig:
     if self.named_logger_level is not None:
       oprot.writeFieldBegin('named_logger_level', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRING, TType.STRUCT, len(self.named_logger_level))
-      for kiter622,viter623 in self.named_logger_level.items():
-        oprot.writeString(kiter622.encode('utf-8'))
-        viter623.write(oprot)
+      for kiter636,viter637 in self.named_logger_level.items():
+        oprot.writeString(kiter636.encode('utf-8'))
+        viter637.write(oprot)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -9290,10 +9896,10 @@ class TopologyHistoryInfo:
       if fid == 1:
         if ftype == TType.LIST:
           self.topo_ids = []
-          (_etype627, _size624) = iprot.readListBegin()
-          for _i628 in xrange(_size624):
-            _elem629 = iprot.readString().decode('utf-8')
-            self.topo_ids.append(_elem629)
+          (_etype641, _size638) = iprot.readListBegin()
+          for _i642 in xrange(_size638):
+            _elem643 = iprot.readString().decode('utf-8')
+            self.topo_ids.append(_elem643)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -9310,8 +9916,8 @@ class TopologyHistoryInfo:
     if self.topo_ids is not None:
       oprot.writeFieldBegin('topo_ids', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.topo_ids))
-      for iter630 in self.topo_ids:
-        oprot.writeString(iter630.encode('utf-8'))
+      for iter644 in self.topo_ids:
+        oprot.writeString(iter644.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -9961,11 +10567,11 @@ class HBRecords:
       if fid == 1:
         if ftype == TType.LIST:
           self.pulses = []
-          (_etype627, _size624) = iprot.readListBegin()
-          for _i628 in xrange(_size624):
-            _elem629 = HBPulse()
-            _elem629.read(iprot)
-            self.pulses.append(_elem629)
+          (_etype648, _size645) = iprot.readListBegin()
+          for _i649 in xrange(_size645):
+            _elem650 = HBPulse()
+            _elem650.read(iprot)
+            self.pulses.append(_elem650)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -9982,8 +10588,8 @@ class HBRecords:
     if self.pulses is not None:
       oprot.writeFieldBegin('pulses', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.pulses))
-      for iter630 in self.pulses:
-        iter630.write(oprot)
+      for iter651 in self.pulses:
+        iter651.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -10035,10 +10641,10 @@ class HBNodes:
       if fid == 1:
         if ftype == TType.LIST:
           self.pulseIds = []
-          (_etype634, _size631) = iprot.readListBegin()
-          for _i635 in xrange(_size631):
-            _elem636 = iprot.readString().decode('utf-8')
-            self.pulseIds.append(_elem636)
+          (_etype655, _size652) = iprot.readListBegin()
+          for _i656 in xrange(_size652):
+            _elem657 = iprot.readString().decode('utf-8')
+            self.pulseIds.append(_elem657)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -10055,8 +10661,8 @@ class HBNodes:
     if self.pulseIds is not None:
       oprot.writeFieldBegin('pulseIds', TType.LIST, 1)
       oprot.writeListBegin(TType.STRING, len(self.pulseIds))
-      for iter637 in self.pulseIds:
-        oprot.writeString(iter637.encode('utf-8'))
+      for iter658 in self.pulseIds:
+        oprot.writeString(iter658.encode('utf-8'))
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 677de2b..08be005 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -136,6 +136,14 @@ exception InvalidTopologyException {
   1: required string msg;
 }
 
+exception KeyNotFoundException {
+  1: required string msg;
+}
+
+exception KeyAlreadyExistsException {
+  1: required string msg;
+}
+
 struct TopologySummary {
   1: required string id;
   2: required string name;
@@ -371,6 +379,42 @@ struct SubmitOptions {
   2: optional Credentials creds;
 }
 
+enum AccessControlType {
+  OTHER = 1,
+  USER = 2
+  //eventually ,GROUP=3
+}
+
+struct AccessControl {
+  1: required AccessControlType type;
+  2: optional string name; //Name of user or group in ACL
+  3: required i32 access; //bitmasks READ=0x1, WRITE=0x2, ADMIN=0x4
+}
+
+struct SettableBlobMeta {
+  1: required list<AccessControl> acl;
+  2: optional i32 replication_factor
+}
+
+struct ReadableBlobMeta {
+  1: required SettableBlobMeta settable;
+  //This is some indication of a version of a BLOB.  The only guarantee is
+  // if the data changed in the blob the version will be different.
+  2: required i64 version;
+}
+
+struct ListBlobsResult {
+  1: required list<string> keys;
+  2: required string session;
+}
+
+struct BeginDownloadResult {
+  //Same version as in ReadableBlobMeta
+  1: required i64 version;
+  2: required string session;
+  3: optional i64 data_size;
+}
+
 struct SupervisorInfo {
     1: required i64 time_secs;
     2: required string hostname;
@@ -565,6 +609,21 @@ service Nimbus {
 
   void uploadNewCredentials(1: string name, 2: Credentials creds) throws (1: NotAliveException e, 2: InvalidTopologyException ite, 3: AuthorizationException aze);
 
+  string beginCreateBlob(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyAlreadyExistsException kae);
+  string beginUpdateBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  void uploadBlobChunk(1: string session, 2: binary chunk) throws (1: AuthorizationException aze);
+  void finishBlobUpload(1: string session) throws (1: AuthorizationException aze);
+  void cancelBlobUpload(1: string session) throws (1: AuthorizationException aze);
+  ReadableBlobMeta getBlobMeta(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  void setBlobMeta(1: string key, 2: SettableBlobMeta meta) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  BeginDownloadResult beginBlobDownload(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  binary downloadBlobChunk(1: string session) throws (1: AuthorizationException aze);
+  void deleteBlob(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  ListBlobsResult listBlobs(1: string session); //empty string "" means start at the beginning
+  i32 getBlobReplication(1: string key) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  i32 updateBlobReplication(1: string key, 2: i32 replication) throws (1: AuthorizationException aze, 2: KeyNotFoundException knf);
+  void createStateInZookeeper(1: string key); // creates state in zookeeper when blob is uploaded through command line
+
   // need to add functions for asking about status of storms, what nodes they're running on, looking at task logs
 
   string beginFileUpload() throws (1: AuthorizationException aze);

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/cluster_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/cluster_test.clj b/storm-core/test/clj/backtype/storm/cluster_test.clj
index eea4637..82f305b 100644
--- a/storm-core/test/clj/backtype/storm/cluster_test.clj
+++ b/storm-core/test/clj/backtype/storm/cluster_test.clj
@@ -208,14 +208,15 @@
       (.set-credentials! state "storm1" {"b" "b"} {})
       (is (= {"b" "b"} (.credentials state "storm1" nil)))
 
-      (is (= [] (.code-distributor state nil)))
-      (.setup-code-distributor! state "storm1" nimbusInfo1)
-      (is (= ["storm1"] (.code-distributor state nil)))
-      (is (= [nimbusInfo1] (.code-distributor-info state "storm1")))
-      (.setup-code-distributor! state "storm1" nimbusInfo2)
-      (is (= #{nimbusInfo1 nimbusInfo2} (set (.code-distributor-info state "storm1"))))
-      (.remove-storm! state "storm1")
-      (is (= [] (.code-distributor state nil)))
+      (is (= [] (.blobstore-info state nil)))
+      (.setup-blobstore! state "key1" nimbusInfo1 "1")
+      (is (= ["key1"] (.blobstore-info state nil)))
+      (is (= [(str (.toHostPortString nimbusInfo1) "-1")] (.blobstore-info state "key1")))
+      (.setup-blobstore! state "key1" nimbusInfo2 "1")
+      (is (= #{(str (.toHostPortString nimbusInfo1) "-1")
+               (str (.toHostPortString nimbusInfo2) "-1")} (set (.blobstore-info state "key1"))))
+      (.remove-blobstore-key! state "key1")
+      (is (= [] (.blobstore-info state nil)))
 
       (is (= [] (.nimbuses state)))
       (.add-nimbus-host! state "nimbus1:port" nimbusSummary1)
@@ -266,8 +267,7 @@
     (let [state1 (mk-storm-state zk-port)
           state2 (mk-storm-state zk-port)
           supervisor-info1 (SupervisorInfo. 10 "hostname-1" "id1" [1 2] [] {} 1000 "0.9.2" nil)
-          supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)
-          ]
+          supervisor-info2 (SupervisorInfo. 10 "hostname-2" "id2" [1 2] [] {} 1000 "0.9.2" nil)]
       (is (= [] (.supervisors state1 nil)))
       (.supervisor-heartbeat! state2 "2" supervisor-info2)
       (.supervisor-heartbeat! state1 "1" supervisor-info1)

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/nimbus_test.clj b/storm-core/test/clj/backtype/storm/nimbus_test.clj
index 53d4bb8..0847883 100644
--- a/storm-core/test/clj/backtype/storm/nimbus_test.clj
+++ b/storm-core/test/clj/backtype/storm/nimbus_test.clj
@@ -29,7 +29,7 @@
             LogConfig LogLevel LogLevelAction])
   (:import [java.util HashMap])
   (:import [java.io File])
-  (:import [backtype.storm.utils Time])
+  (:import [backtype.storm.utils Time Utils])
   (:import [org.apache.commons.io FileUtils])
   (:use [backtype.storm testing MockAutoCred util config log timer zookeeper])
   (:use [backtype.storm.daemon common])
@@ -939,41 +939,15 @@
          (bind storm-id1 (get-storm-id cluster-state "t1"))
          (bind storm-id2 (get-storm-id cluster-state "t2"))
          (.shutdown nimbus)
-         (rmr (master-stormdist-root conf storm-id1))
+         (let [blob-store (Utils/getNimbusBlobStore conf nil)]
+           (nimbus/blob-rm-topology-keys storm-id1 blob-store cluster-state)
+           (.shutdown blob-store))
          (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
          (is ( = #{storm-id2} (set (.active-storms cluster-state))))
          (.shutdown nimbus)
          (.disconnect cluster-state)
          )))))
 
-
-(deftest test-cleans-corrupt
-  (with-inprocess-zookeeper zk-port
-    (with-local-tmp [nimbus-dir]
-      (stubbing [zk-leader-elector (mock-leader-elector)]
-        (letlocals
-          (bind conf (merge (read-storm-config)
-                       {STORM-ZOOKEEPER-SERVERS ["localhost"]
-                        STORM-CLUSTER-MODE "local"
-                        STORM-ZOOKEEPER-PORT zk-port
-                        STORM-LOCAL-DIR nimbus-dir}))
-          (bind cluster-state (cluster/mk-storm-cluster-state conf))
-          (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-          (bind topology (thrift/mk-topology
-                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)}
-                           {}))
-          (submit-local-topology nimbus "t1" {} topology)
-          (submit-local-topology nimbus "t2" {} topology)
-          (bind storm-id1 (get-storm-id cluster-state "t1"))
-          (bind storm-id2 (get-storm-id cluster-state "t2"))
-          (.shutdown nimbus)
-          (rmr (master-stormdist-root conf storm-id1))
-          (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-          (is ( = #{storm-id2} (set (.active-storms cluster-state))))
-          (.shutdown nimbus)
-          (.disconnect cluster-state)
-          )))))
-
 ;(deftest test-no-overlapping-slots
 ;  ;; test that same node+port never appears across 2 assignments
 ;  )
@@ -1173,7 +1147,7 @@
                     nimbus/check-authorization!
                       [1 2 3] expected-name expected-conf expected-operation)
                   (verify-first-call-args-for-indices
-                    nimbus/try-read-storm-topology [0] expected-conf))))))))))
+                    nimbus/try-read-storm-topology [0] "fake-id"))))))))))
 
 (deftest test-nimbus-iface-getTopology-methods-throw-correctly
   (with-local-cluster [cluster]
@@ -1230,7 +1204,8 @@
                         :status {:type bogus-type}}
                 }
         ]
-      (stubbing [topology-bases bogus-bases]
+      (stubbing [topology-bases bogus-bases
+                 nimbus/get-blob-replication-count 1]
         (let [topos (.get_topologies (.getClusterInfo nimbus))]
           ; The number of topologies in the summary is correct.
           (is (= (count
@@ -1265,6 +1240,7 @@
           digest "storm:thisisapoorpassword"
           auth-conf {STORM-ZOOKEEPER-AUTH-SCHEME scheme
                      STORM-ZOOKEEPER-AUTH-PAYLOAD digest
+                     STORM-PRINCIPAL-TO-LOCAL-PLUGIN "backtype.storm.security.auth.DefaultPrincipalToLocal"
                      NIMBUS-THRIFT-PORT 6666}
           expected-acls nimbus/NIMBUS-ZK-ACLS
           fake-inimbus (reify INimbus (getForcedScheduler [this] nil))]
@@ -1272,10 +1248,11 @@
                  mk-authorization-handler nil
                  cluster/mk-storm-cluster-state nil
                  nimbus/file-cache-map nil
+                 nimbus/mk-blob-cache-map nil
+                 nimbus/mk-bloblist-cache-map nil
                  uptime-computer nil
                  new-instance nil
                  mk-timer nil
-                 nimbus/mk-code-distributor nil
                  zk-leader-elector nil
                  nimbus/mk-scheduler nil]
         (nimbus/nimbus-data auth-conf fake-inimbus)

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj b/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
index ea45ddc..18d4ada 100644
--- a/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
+++ b/storm-core/test/clj/backtype/storm/security/auth/ReqContext_test.clj
@@ -68,5 +68,6 @@
     (.setSubject rc s)
     (is (not (nil? (.principal rc))))
     (is (= (-> rc .principal .getName) principal-name))
+    (.setSubject rc nil)
   )
 )

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/clj/backtype/storm/supervisor_test.clj
----------------------------------------------------------------------
diff --git a/storm-core/test/clj/backtype/storm/supervisor_test.clj b/storm-core/test/clj/backtype/storm/supervisor_test.clj
index 04c8600..776ad6e 100644
--- a/storm-core/test/clj/backtype/storm/supervisor_test.clj
+++ b/storm-core/test/clj/backtype/storm/supervisor_test.clj
@@ -324,7 +324,8 @@
                      set-worker-user! nil
                      supervisor/jlp nil
                      worker-artifacts-root "/tmp/workers-artifacts"
-                     supervisor/write-log-metadata! nil]
+                     supervisor/write-log-metadata! nil
+                     supervisor/create-blobstore-links nil]
             (supervisor/launch-worker mock-supervisor
                                       mock-storm-id
                                       mock-port
@@ -346,8 +347,9 @@
                      launch-process nil
                      set-worker-user! nil
                      supervisor/jlp nil
-                     worker-artifacts-root "/tmp/workers-artifacts"
-                     supervisor/write-log-metadata! nil]
+                     supervisor/write-log-metadata! nil
+                     supervisor/create-blobstore-links nil
+                     worker-artifacts-root "/tmp/workers-artifacts"]
             (supervisor/launch-worker mock-supervisor
                                       mock-storm-id
                                       mock-port
@@ -367,7 +369,8 @@
                      set-worker-user! nil
                      supervisor/write-log-metadata! nil
                      launch-process nil
-                     current-classpath (str file-path-separator "base")]
+                     current-classpath (str file-path-separator "base")
+                     supervisor/create-blobstore-links nil]
                     (supervisor/launch-worker mock-supervisor
                                               mock-storm-id
                                               mock-port
@@ -388,7 +391,8 @@
                      launch-process nil
                      set-worker-user! nil
                      supervisor/write-log-metadata! nil
-                     current-classpath (str file-path-separator "base")]
+                     current-classpath (str file-path-separator "base")
+                     supervisor/create-blobstore-links nil]
                     (supervisor/launch-worker mock-supervisor
                                               mock-storm-id
                                               mock-port
@@ -540,8 +544,8 @@
                  cluster/mk-storm-cluster-state nil
                  supervisor-state nil
                  local-hostname nil
-                 supervisor/mk-code-distributor nil
-                 mk-timer nil]
+                 mk-timer nil
+                 supervisor-local-dir nil]
         (supervisor/supervisor-data auth-conf nil fake-isupervisor)
         (verify-call-times-for cluster/mk-storm-cluster-state 1)
         (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2]

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java b/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java
new file mode 100644
index 0000000..388b491
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/blobstore/BlobStoreTest.java
@@ -0,0 +1,461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.generated.AuthorizationException;
+import backtype.storm.generated.KeyAlreadyExistsException;
+import backtype.storm.generated.KeyNotFoundException;
+import backtype.storm.generated.SettableBlobMeta;
+
+import backtype.storm.security.auth.NimbusPrincipal;
+import backtype.storm.security.auth.SingleUserPrincipal;
+import backtype.storm.utils.Utils;
+import org.apache.commons.io.FileUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.security.auth.Subject;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.URI;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.UUID;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.ArrayList;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.*;
+
+public class BlobStoreTest {
+  private static final Logger LOG = LoggerFactory.getLogger(BlobStoreTest.class);
+  URI base;
+  File baseFile;
+  private static Map conf = new HashMap();
+  public static final int READ = 0x01;
+  public static final int WRITE = 0x02;
+  public static final int ADMIN = 0x04;
+
+  @Before
+  public void init() {
+    initializeConfigs();
+    baseFile = new File("/tmp/blob-store-test-"+UUID.randomUUID());
+    base = baseFile.toURI();
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    FileUtils.deleteDirectory(baseFile);
+  }
+
+  // Method which initializes nimbus admin
+  public static void initializeConfigs() {
+    conf.put(Config.NIMBUS_ADMINS,"admin");
+    conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
+  }
+
+  // Gets Nimbus Subject with NimbusPrincipal set on it
+  public static Subject getNimbusSubject() {
+    Subject nimbus = new Subject();
+    nimbus.getPrincipals().add(new NimbusPrincipal());
+    return nimbus;
+  }
+
+  // Overloading the assertStoreHasExactly method accomodate Subject in order to check for authorization
+  public static void assertStoreHasExactly(BlobStore store, Subject who, String ... keys)
+      throws IOException, KeyNotFoundException, AuthorizationException {
+    Set<String> expected = new HashSet<String>(Arrays.asList(keys));
+    Set<String> found = new HashSet<String>();
+    Iterator<String> c = store.listKeys();
+    while (c.hasNext()) {
+      String keyName = c.next();
+      found.add(keyName);
+    }
+    Set<String> extra = new HashSet<String>(found);
+    extra.removeAll(expected);
+    assertTrue("Found extra keys in the blob store "+extra, extra.isEmpty());
+    Set<String> missing = new HashSet<String>(expected);
+    missing.removeAll(found);
+    assertTrue("Found keys missing from the blob store "+missing, missing.isEmpty());
+  }
+
+  public static void assertStoreHasExactly(BlobStore store, String ... keys)
+      throws IOException, KeyNotFoundException, AuthorizationException {
+    assertStoreHasExactly(store, null, keys);
+  }
+
+  // Overloading the readInt method accomodate Subject in order to check for authorization (security turned on)
+  public static int readInt(BlobStore store, Subject who, String key)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    InputStream in = store.getBlob(key, who);
+    try {
+      return in.read();
+    } finally {
+      in.close();
+    }
+  }
+
+  public static int readInt(BlobStore store, String key)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    return readInt(store, null, key);
+  }
+
+  public static void readAssertEquals(BlobStore store, String key, int value)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+    assertEquals(value, readInt(store, key));
+  }
+
+  // Checks for assertion when we turn on security
+  public void readAssertEqualsWithAuth(BlobStore store, Subject who, String key, int value)
+          throws IOException, KeyNotFoundException, AuthorizationException {
+   assertEquals(value, readInt(store, who, key));
+  }
+
+  private LocalFsBlobStore initLocalFs() {
+    LocalFsBlobStore store = new LocalFsBlobStore();
+    // Spy object that tries to mock the real object store
+    LocalFsBlobStore spy = spy(store);
+    Mockito.doNothing().when(spy).checkForBlobUpdate("test");
+    Mockito.doNothing().when(spy).checkForBlobUpdate("other");
+    Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-WE");
+    Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-subject-DEF");
+    Mockito.doNothing().when(spy).checkForBlobUpdate("test-empty-acls");
+    Map conf = Utils.readStormConfig();
+    conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
+    conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal");
+    ArrayList<String> zookeeper_list = new ArrayList<>();
+    spy.prepare(conf, null, null);
+    return spy;
+  }
+
+  @Test
+  public void testLocalFsWithAuth() throws Exception {
+    testWithAuthentication(initLocalFs());
+  }
+
+  @Test
+  public void testBasicLocalFs() throws Exception {
+    testBasic(initLocalFs());
+  }
+
+  @Test
+  public void testMultipleLocalFs() throws Exception {
+    testMultiple(initLocalFs());
+  }
+
+  public Subject getSubject(String name) {
+    Subject subject = new Subject();
+    SingleUserPrincipal user = new SingleUserPrincipal(name);
+    subject.getPrincipals().add(user);
+    return subject;
+  }
+
+  // Check for Blobstore with authentication
+  public void testWithAuthentication(BlobStore store) throws Exception {
+    //Test for Nimbus Admin
+    Subject admin = getSubject("admin");
+    assertStoreHasExactly(store);
+    SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    AtomicOutputStream out = store.createBlob("test", metadata, admin);
+    assertStoreHasExactly(store, "test");
+    out.write(1);
+    out.close();
+    store.deleteBlob("test", admin);
+
+    //Test for Supervisor Admin
+    Subject supervisor = getSubject("supervisor");
+    assertStoreHasExactly(store);
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    out = store.createBlob("test", metadata, supervisor);
+    assertStoreHasExactly(store, "test");
+    out.write(1);
+    out.close();
+    store.deleteBlob("test", supervisor);
+
+    //Test for Nimbus itself as a user
+    Subject nimbus = getNimbusSubject();
+    assertStoreHasExactly(store);
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    out = store.createBlob("test", metadata, nimbus);
+    assertStoreHasExactly(store, "test");
+    out.write(1);
+    out.close();
+    store.deleteBlob("test", nimbus);
+
+    // Test with a dummy test_subject for cases where subject !=null (security turned on)
+    Subject who = getSubject("test_subject");
+    assertStoreHasExactly(store);
+
+    // Tests for case when subject != null (security turned on) and
+    // acls for the blob are set to WORLD_EVERYTHING
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+    out = store.createBlob("test", metadata, who);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test", 1);
+
+    LOG.info("Deleting test");
+    store.deleteBlob("test", who);
+    assertStoreHasExactly(store);
+
+    // Tests for case when subject != null (security turned on) and
+    // acls are not set for the blob (DEFAULT)
+    LOG.info("Creating test again");
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    out = store.createBlob("test", metadata, who);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    // Testing whether acls are set to WORLD_EVERYTHING. Here the acl should not contain WORLD_EVERYTHING because
+    // the subject is neither null nor empty. The ACL should however contain USER_EVERYTHING as user needs to have
+    // complete access to the blob
+    assertTrue("ACL does not contain WORLD_EVERYTHING", !metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test", 2);
+
+    LOG.info("Updating test");
+    out = store.updateBlob("test", who);
+    out.write(3);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    readAssertEqualsWithAuth(store, who, "test", 3);
+
+    LOG.info("Updating test again");
+    out = store.updateBlob("test", who);
+    out.write(4);
+    out.flush();
+    LOG.info("SLEEPING");
+    Thread.sleep(2);
+    assertStoreHasExactly(store, "test");
+    readAssertEqualsWithAuth(store, who, "test", 3);
+
+    // Test for subject with no principals and acls set to WORLD_EVERYTHING
+    who = new Subject();
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+    LOG.info("Creating test");
+    out = store.createBlob("test-empty-subject-WE", metadata, who);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test-empty-subject-WE", "test");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test-empty-subject-WE", 2);
+
+    // Test for subject with no principals and acls set to DEFAULT
+    who = new Subject();
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+    LOG.info("Creating other");
+    out = store.createBlob("test-empty-subject-DEF", metadata, who);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test-empty-subject-DEF", "test", "test-empty-subject-WE");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEqualsWithAuth(store, who, "test-empty-subject-DEF", 2);
+
+    if (store instanceof LocalFsBlobStore) {
+      ((LocalFsBlobStore) store).fullCleanup(1);
+    } else {
+      fail("Error the blobstore is of unknowntype");
+    }
+    try {
+      out.close();
+    } catch (IOException e) {
+      // This is likely to happen when we try to commit something that
+      // was cleaned up.  This is expected and acceptable.
+    }
+  }
+
+  public void testBasic(BlobStore store) throws Exception {
+    assertStoreHasExactly(store);
+    LOG.info("Creating test");
+    // Tests for case when subject == null (security turned off) and
+    // acls for the blob are set to WORLD_EVERYTHING
+    SettableBlobMeta metadata = new SettableBlobMeta(BlobStoreAclHandler
+            .WORLD_EVERYTHING);
+    AtomicOutputStream out = store.createBlob("test", metadata, null);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    // Testing whether acls are set to WORLD_EVERYTHING
+    assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    readAssertEquals(store, "test", 1);
+
+    LOG.info("Deleting test");
+    store.deleteBlob("test", null);
+    assertStoreHasExactly(store);
+
+    // The following tests are run for both hdfs and local store to test the
+    // update blob interface
+    metadata = new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING);
+    LOG.info("Creating test again");
+    out = store.createBlob("test", metadata, null);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    if (store instanceof LocalFsBlobStore) {
+      assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.toString().contains("AccessControl(type:OTHER, access:7)"));
+    }
+    readAssertEquals(store, "test", 2);
+    LOG.info("Updating test");
+    out = store.updateBlob("test", null);
+    out.write(3);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 3);
+
+    LOG.info("Updating test again");
+    out = store.updateBlob("test", null);
+    out.write(4);
+    out.flush();
+    LOG.info("SLEEPING");
+    Thread.sleep(2);
+
+    // Tests for case when subject == null (security turned off) and
+    // acls for the blob are set to DEFAULT (Empty ACL List) only for LocalFsBlobstore
+    if (store instanceof LocalFsBlobStore) {
+      metadata = new SettableBlobMeta(BlobStoreAclHandler.DEFAULT);
+      LOG.info("Creating test for empty acls when security is off");
+      out = store.createBlob("test-empty-acls", metadata, null);
+      LOG.info("metadata {}", metadata);
+      out.write(2);
+      out.close();
+      assertStoreHasExactly(store, "test-empty-acls", "test");
+      // Testing whether acls are set to WORLD_EVERYTHING, Here we are testing only for LocalFsBlobstore
+      // as the HdfsBlobstore gets the subject information of the local system user and behaves as it is
+      // always authenticated.
+      assertTrue("ACL does not contain WORLD_EVERYTHING", metadata.get_acl().toString().contains("OTHER"));
+
+      LOG.info("Deleting test-empty-acls");
+      store.deleteBlob("test-empty-acls", null);
+    }
+
+    if (store instanceof LocalFsBlobStore) {
+      ((LocalFsBlobStore) store).fullCleanup(1);
+    } else {
+      fail("Error the blobstore is of unknowntype");
+    }
+    try {
+      out.close();
+    } catch (IOException e) {
+      // This is likely to happen when we try to commit something that
+      // was cleaned up.  This is expected and acceptable.
+    }
+  }
+
+
+  public void testMultiple(BlobStore store) throws Exception {
+
+    assertStoreHasExactly(store);
+    LOG.info("Creating test");
+    AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler
+        .WORLD_EVERYTHING), null);
+    out.write(1);
+    out.close();
+    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 1);
+
+    LOG.info("Creating other");
+    out = store.createBlob("other", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+        null);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 1);
+    readAssertEquals(store, "other", 2);
+
+    LOG.info("Updating other");
+    out = store.updateBlob("other", null);
+    out.write(5);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 1);
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Deleting test");
+    store.deleteBlob("test", null);
+    assertStoreHasExactly(store, "other");
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Creating test again");
+    out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler.WORLD_EVERYTHING),
+        null);
+    out.write(2);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 2);
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Updating test");
+    out = store.updateBlob("test", null);
+    out.write(3);
+    out.close();
+    assertStoreHasExactly(store, "test", "other");
+    readAssertEquals(store, "test", 3);
+    readAssertEquals(store, "other", 5);
+
+    LOG.info("Deleting other");
+    store.deleteBlob("other", null);
+    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 3);
+
+    LOG.info("Updating test again");
+    out = store.updateBlob("test", null);
+    out.write(4);
+    out.flush();
+    LOG.info("SLEEPING");
+    Thread.sleep(2);
+
+    if (store instanceof LocalFsBlobStore) {
+      ((LocalFsBlobStore) store).fullCleanup(1);
+    } else {
+      fail("Error the blobstore is of unknowntype");
+    }    assertStoreHasExactly(store, "test");
+    readAssertEquals(store, "test", 3);
+    try {
+      out.close();
+    } catch (IOException e) {
+      // This is likely to happen when we try to commit something that
+      // was cleaned up.  This is expected and acceptable.
+    }
+  }
+
+  @Test
+  public void testGetFileLength()
+          throws AuthorizationException, KeyNotFoundException, KeyAlreadyExistsException, IOException {
+    LocalFsBlobStore store = initLocalFs();
+    AtomicOutputStream out = store.createBlob("test", new SettableBlobMeta(BlobStoreAclHandler
+        .WORLD_EVERYTHING), null);
+    out.write(1);
+    out.close();
+    assertEquals(1, store.getBlob("test", null).getFileLength());
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/7029aee5/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java b/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java
new file mode 100644
index 0000000..63f633d
--- /dev/null
+++ b/storm-core/test/jvm/backtype/storm/blobstore/BlobSynchronizerTest.java
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package backtype.storm.blobstore;
+
+import backtype.storm.Config;
+import backtype.storm.nimbus.NimbusInfo;
+import backtype.storm.utils.Utils;
+import org.apache.commons.io.FileUtils;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.test.TestingServer;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.*;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ *  Unit tests for most of the testable utility methods
+ *  and BlobSynchronizer class methods
+ */
+public class BlobSynchronizerTest {
+  private URI base;
+  private File baseFile;
+  private static Map conf = new HashMap();
+  private NIOServerCnxnFactory factory;
+
+  @Before
+  public void init() throws Exception {
+    initializeConfigs();
+    baseFile = new File("/tmp/blob-store-test-"+ UUID.randomUUID());
+    base = baseFile.toURI();
+  }
+
+  @After
+  public void cleanUp() throws IOException {
+    FileUtils.deleteDirectory(baseFile);
+    if (factory != null) {
+      factory.shutdown();
+    }
+  }
+
+  // Method which initializes nimbus admin
+  public static void initializeConfigs() {
+    conf.put(Config.NIMBUS_ADMINS,"admin");
+    conf.put(Config.NIMBUS_SUPERVISOR_USERS,"supervisor");
+  }
+
+  private LocalFsBlobStore initLocalFs() {
+    LocalFsBlobStore store = new LocalFsBlobStore();
+    Map conf = Utils.readStormConfig();
+    conf.put(Config.STORM_LOCAL_DIR, baseFile.getAbsolutePath());
+    conf.put(Config.STORM_PRINCIPAL_TO_LOCAL_PLUGIN,"backtype.storm.security.auth.DefaultPrincipalToLocal");
+    this.conf = conf;
+    store.prepare(conf, null, null);
+    return store;
+  }
+
+  @Test
+  public void testBlobSynchronizerForKeysToDownload() {
+    BlobStore store = initLocalFs();
+    BlobSynchronizer sync = new BlobSynchronizer(store, conf);
+    // test for keylist to download
+    Set<String> zkSet = new HashSet<String>();
+    zkSet.add("key1");
+    Set<String> blobStoreSet = new HashSet<String>();
+    blobStoreSet.add("key1");
+    Set<String> resultSet = sync.getKeySetToDownload(blobStoreSet, zkSet);
+    assertTrue("Not Empty", resultSet.isEmpty());
+    zkSet.add("key1");
+    blobStoreSet.add("key2");
+    resultSet =  sync.getKeySetToDownload(blobStoreSet, zkSet);
+    assertTrue("Not Empty", resultSet.isEmpty());
+    blobStoreSet.remove("key1");
+    blobStoreSet.remove("key2");
+    zkSet.add("key1");
+    resultSet =  sync.getKeySetToDownload(blobStoreSet, zkSet);
+    assertTrue("Unexpected keys to download", (resultSet.size() == 1) && (resultSet.contains("key1")));
+  }
+
+  @Test
+  public void testGetLatestSequenceNumber() throws Exception {
+    List<String> stateInfoList = new ArrayList<String>();
+    stateInfoList.add("nimbus1:8000-2");
+    stateInfoList.add("nimbus-1:8000-4");
+    assertTrue("Failed to get the latest version", BlobStoreUtils.getLatestSequenceNumber(stateInfoList)==4);
+  }
+
+  @Test
+  public void testNimbodesWithLatestVersionOfBlob() throws Exception {
+    TestingServer server = new TestingServer();
+    CuratorFramework zkClient = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
+    zkClient.start();
+    // Creating nimbus hosts containing latest version of blob
+    zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
+    zkClient.create().creatingParentContainersIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
+    Set<NimbusInfo> set = BlobStoreUtils.getNimbodesWithLatestSequenceNumberOfBlob(zkClient, "key1");
+    assertEquals("Failed to get the correct nimbus hosts with latest blob version", (set.iterator().next()).getHost(),"nimbus2");
+    zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus1:7800-1");
+    zkClient.delete().deletingChildrenIfNeeded().forPath("/blobstore/key1/nimbus2:7800-2");
+    zkClient.close();
+    server.close();
+  }
+
+  @Test
+  public void testNormalizeVersionInfo () throws Exception {
+    BlobKeySequenceInfo info1 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus1:7800-1");
+    assertTrue(info1.getNimbusHostPort().equals("nimbus1:7800"));
+    assertTrue(info1.getSequenceNumber().equals("1"));
+    BlobKeySequenceInfo info2 = BlobStoreUtils.normalizeNimbusHostPortSequenceNumberInfo("nimbus-1:7800-1");
+    assertTrue(info2.getNimbusHostPort().equals("nimbus-1:7800"));
+    assertTrue(info2.getSequenceNumber().equals("1"));
+  }
+}