You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/20 08:56:36 UTC

[3/8] storm git commit: Added in some optimizations for better topology submission performance

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
index e8cef09..85209d6 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/NimbusClient.java
@@ -18,7 +18,6 @@
 package org.apache.storm.utils;
 
 import org.apache.storm.Config;
-import org.apache.storm.generated.ClusterSummary;
 import org.apache.storm.generated.Nimbus;
 import org.apache.storm.generated.NimbusSummary;
 import org.apache.storm.security.auth.ReqContext;
@@ -78,32 +77,36 @@ public class NimbusClient extends ThriftClient {
 
         for (String host : seeds) {
             int port = Integer.parseInt(conf.get(Config.NIMBUS_THRIFT_PORT).toString());
-            ClusterSummary clusterInfo;
-            try (NimbusClient client = new NimbusClient(conf, host, port, null, asUser)) {
-                clusterInfo = client.getClient().getClusterInfo();
+            NimbusSummary nimbusSummary;
+            NimbusClient client = null;
+            try {
+                client = new NimbusClient(conf, host, port, null, asUser);
+                nimbusSummary = client.getClient().getLeader();
+                if (nimbusSummary != null) {
+                    String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
+                    LOG.info("Found leader nimbus : {}", leaderNimbus);
+                    if (nimbusSummary.get_host().equals(host) && nimbusSummary.get_port() == port) {
+                        NimbusClient ret = client;
+                        client = null;
+                        return ret;
+                    }
+                    try {
+                        return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
+                    } catch (TTransportException e) {
+                        throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
+                    }
+                }
             } catch (Exception e) {
                 LOG.warn("Ignoring exception while trying to get leader nimbus info from " + host
                         + ". will retry with a different seed host.", e);
                 continue;
-            }
-            List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
-            if (nimbuses != null) {
-                for (NimbusSummary nimbusSummary : nimbuses) {
-                    if (nimbusSummary.is_isLeader()) {
-                        String leaderNimbus = nimbusSummary.get_host() + ":" + nimbusSummary.get_port();
-                        LOG.info("Found leader nimbus : {}", leaderNimbus);
-
-                        try {
-                            return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port(), null, asUser);
-                        } catch (TTransportException e) {
-                            throw new RuntimeException("Failed to create a nimbus client for the leader " + leaderNimbus, e);
-                        }
-                    }
+            } finally {
+                if (client != null) {
+                    client.close();
                 }
-                throw new NimbusLeaderNotFoundException(
-                        "Found nimbuses " + nimbuses + " none of which is elected as leader, please try " +
-                        "again after some time.");
             }
+            throw new NimbusLeaderNotFoundException("Could not find a nimbus leader, please try " +
+                    "again after some time.");
         }
         throw new NimbusLeaderNotFoundException(
                 "Could not find leader nimbus from seed hosts " + seeds + ". " +

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
index 7528221..33c2c1b 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/StormBoundedExponentialBackoffRetry.java
@@ -66,11 +66,14 @@ public class StormBoundedExponentialBackoffRetry extends BoundedExponentialBacko
             int exp = 1 << retryCount;
             int jitter = random.nextInt(exp);
             int sleepTimeMs = super.getBaseSleepTimeMs() + exp + jitter;
+            LOG.warn("WILL SLEEP FOR {}ms (NOT MAX)", sleepTimeMs);
             return sleepTimeMs;
         } else {
             int stepJitter = random.nextInt(stepSize);
-            return Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
+            int sleepTimeMs = Math.min(super.getMaxSleepTimeMs(), (linearBaseSleepMs +
                     (stepSize * (retryCount - expRetriesThreshold)) + stepJitter));
+            LOG.warn("WILL SLEEP FOR {}ms (MAX)", sleepTimeMs);
+            return sleepTimeMs;
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/Utils.java b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
index ed5c552..75ffd23 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/Utils.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/Utils.java
@@ -1142,12 +1142,14 @@ public class Utils {
 
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, String root, ZookeeperAuthInfo auth) {
         CuratorFramework ret = newCurator(conf, servers, port, root, auth);
+        LOG.info("Starting Utils Curator...");
         ret.start();
         return ret;
     }
 
     public static CuratorFramework newCuratorStarted(Map conf, List<String> servers, Object port, ZookeeperAuthInfo auth) {
         CuratorFramework ret = newCurator(conf, servers, port, auth);
+        LOG.info("Starting Utils Curator (2)...");
         ret.start();
         return ret;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
index 0580f41..ef35307 100644
--- a/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
+++ b/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
@@ -122,6 +122,7 @@ public class Zookeeper {
                 }
             }
         });
+        LOG.info("Staring ZK Curator");
         fk.start();
         return fk;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/py/storm/Nimbus-remote
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus-remote b/storm-core/src/py/storm/Nimbus-remote
index b39050e..0d643c3 100644
--- a/storm-core/src/py/storm/Nimbus-remote
+++ b/storm-core/src/py/storm/Nimbus-remote
@@ -76,6 +76,8 @@ if len(sys.argv) <= 1 or sys.argv[1] == '--help':
   print('  string downloadChunk(string id)')
   print('  string getNimbusConf()')
   print('  ClusterSummary getClusterInfo()')
+  print('  NimbusSummary getLeader()')
+  print('  bool isTopologyNameAllowed(string name)')
   print('  TopologyInfo getTopologyInfo(string id)')
   print('  TopologyInfo getTopologyInfoWithOpts(string id, GetInfoOptions options)')
   print('  TopologyPageInfo getTopologyPageInfo(string id, string window, bool is_include_sys)')
@@ -345,6 +347,18 @@ elif cmd == 'getClusterInfo':
     sys.exit(1)
   pp.pprint(client.getClusterInfo())
 
+elif cmd == 'getLeader':
+  if len(args) != 0:
+    print('getLeader requires 0 args')
+    sys.exit(1)
+  pp.pprint(client.getLeader())
+
+elif cmd == 'isTopologyNameAllowed':
+  if len(args) != 1:
+    print('isTopologyNameAllowed requires 1 args')
+    sys.exit(1)
+  pp.pprint(client.isTopologyNameAllowed(args[0],))
+
 elif cmd == 'getTopologyInfo':
   if len(args) != 1:
     print('getTopologyInfo requires 1 args')

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/py/storm/Nimbus.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/Nimbus.py b/storm-core/src/py/storm/Nimbus.py
index 7e5470a..1c5e86e 100644
--- a/storm-core/src/py/storm/Nimbus.py
+++ b/storm-core/src/py/storm/Nimbus.py
@@ -291,6 +291,16 @@ class Iface:
   def getClusterInfo(self):
     pass
 
+  def getLeader(self):
+    pass
+
+  def isTopologyNameAllowed(self, name):
+    """
+    Parameters:
+     - name
+    """
+    pass
+
   def getTopologyInfo(self, id):
     """
     Parameters:
@@ -1523,6 +1533,67 @@ class Client(Iface):
       raise result.aze
     raise TApplicationException(TApplicationException.MISSING_RESULT, "getClusterInfo failed: unknown result")
 
+  def getLeader(self):
+    self.send_getLeader()
+    return self.recv_getLeader()
+
+  def send_getLeader(self):
+    self._oprot.writeMessageBegin('getLeader', TMessageType.CALL, self._seqid)
+    args = getLeader_args()
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_getLeader(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = getLeader_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.success is not None:
+      return result.success
+    if result.aze is not None:
+      raise result.aze
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "getLeader failed: unknown result")
+
+  def isTopologyNameAllowed(self, name):
+    """
+    Parameters:
+     - name
+    """
+    self.send_isTopologyNameAllowed(name)
+    return self.recv_isTopologyNameAllowed()
+
+  def send_isTopologyNameAllowed(self, name):
+    self._oprot.writeMessageBegin('isTopologyNameAllowed', TMessageType.CALL, self._seqid)
+    args = isTopologyNameAllowed_args()
+    args.name = name
+    args.write(self._oprot)
+    self._oprot.writeMessageEnd()
+    self._oprot.trans.flush()
+
+  def recv_isTopologyNameAllowed(self):
+    iprot = self._iprot
+    (fname, mtype, rseqid) = iprot.readMessageBegin()
+    if mtype == TMessageType.EXCEPTION:
+      x = TApplicationException()
+      x.read(iprot)
+      iprot.readMessageEnd()
+      raise x
+    result = isTopologyNameAllowed_result()
+    result.read(iprot)
+    iprot.readMessageEnd()
+    if result.success is not None:
+      return result.success
+    if result.aze is not None:
+      raise result.aze
+    raise TApplicationException(TApplicationException.MISSING_RESULT, "isTopologyNameAllowed failed: unknown result")
+
   def getTopologyInfo(self, id):
     """
     Parameters:
@@ -1895,6 +1966,8 @@ class Processor(Iface, TProcessor):
     self._processMap["downloadChunk"] = Processor.process_downloadChunk
     self._processMap["getNimbusConf"] = Processor.process_getNimbusConf
     self._processMap["getClusterInfo"] = Processor.process_getClusterInfo
+    self._processMap["getLeader"] = Processor.process_getLeader
+    self._processMap["isTopologyNameAllowed"] = Processor.process_isTopologyNameAllowed
     self._processMap["getTopologyInfo"] = Processor.process_getTopologyInfo
     self._processMap["getTopologyInfoWithOpts"] = Processor.process_getTopologyInfoWithOpts
     self._processMap["getTopologyPageInfo"] = Processor.process_getTopologyPageInfo
@@ -2713,6 +2786,50 @@ class Processor(Iface, TProcessor):
     oprot.writeMessageEnd()
     oprot.trans.flush()
 
+  def process_getLeader(self, seqid, iprot, oprot):
+    args = getLeader_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = getLeader_result()
+    try:
+      result.success = self._handler.getLeader()
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
+      result.aze = aze
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("getLeader", msg_type, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
+  def process_isTopologyNameAllowed(self, seqid, iprot, oprot):
+    args = isTopologyNameAllowed_args()
+    args.read(iprot)
+    iprot.readMessageEnd()
+    result = isTopologyNameAllowed_result()
+    try:
+      result.success = self._handler.isTopologyNameAllowed(args.name)
+      msg_type = TMessageType.REPLY
+    except (TTransport.TTransportException, KeyboardInterrupt, SystemExit):
+      raise
+    except AuthorizationException as aze:
+      msg_type = TMessageType.REPLY
+      result.aze = aze
+    except Exception as ex:
+      msg_type = TMessageType.EXCEPTION
+      logging.exception(ex)
+      result = TApplicationException(TApplicationException.INTERNAL_ERROR, 'Internal error')
+    oprot.writeMessageBegin("isTopologyNameAllowed", msg_type, seqid)
+    result.write(oprot)
+    oprot.writeMessageEnd()
+    oprot.trans.flush()
+
   def process_getTopologyInfo(self, seqid, iprot, oprot):
     args = getTopologyInfo_args()
     args.read(iprot)
@@ -8017,6 +8134,274 @@ class getClusterInfo_result:
   def __ne__(self, other):
     return not (self == other)
 
+class getLeader_args:
+
+  thrift_spec = (
+  )
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('getLeader_args')
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class getLeader_result:
+  """
+  Attributes:
+   - success
+   - aze
+  """
+
+  thrift_spec = (
+    (0, TType.STRUCT, 'success', (NimbusSummary, NimbusSummary.thrift_spec), None, ), # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+  )
+
+  def __init__(self, success=None, aze=None,):
+    self.success = success
+    self.aze = aze
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.STRUCT:
+          self.success = NimbusSummary()
+          self.success.read(iprot)
+        else:
+          iprot.skip(ftype)
+      elif fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('getLeader_result')
+    if self.success is not None:
+      oprot.writeFieldBegin('success', TType.STRUCT, 0)
+      self.success.write(oprot)
+      oprot.writeFieldEnd()
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.success)
+    value = (value * 31) ^ hash(self.aze)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class isTopologyNameAllowed_args:
+  """
+  Attributes:
+   - name
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'name', None, None, ), # 1
+  )
+
+  def __init__(self, name=None,):
+    self.name = name
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.name = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('isTopologyNameAllowed_args')
+    if self.name is not None:
+      oprot.writeFieldBegin('name', TType.STRING, 1)
+      oprot.writeString(self.name.encode('utf-8'))
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.name)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
+class isTopologyNameAllowed_result:
+  """
+  Attributes:
+   - success
+   - aze
+  """
+
+  thrift_spec = (
+    (0, TType.BOOL, 'success', None, None, ), # 0
+    (1, TType.STRUCT, 'aze', (AuthorizationException, AuthorizationException.thrift_spec), None, ), # 1
+  )
+
+  def __init__(self, success=None, aze=None,):
+    self.success = success
+    self.aze = aze
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 0:
+        if ftype == TType.BOOL:
+          self.success = iprot.readBool()
+        else:
+          iprot.skip(ftype)
+      elif fid == 1:
+        if ftype == TType.STRUCT:
+          self.aze = AuthorizationException()
+          self.aze.read(iprot)
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('isTopologyNameAllowed_result')
+    if self.success is not None:
+      oprot.writeFieldBegin('success', TType.BOOL, 0)
+      oprot.writeBool(self.success)
+      oprot.writeFieldEnd()
+    if self.aze is not None:
+      oprot.writeFieldBegin('aze', TType.STRUCT, 1)
+      self.aze.write(oprot)
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    return
+
+
+  def __hash__(self):
+    value = 17
+    value = (value * 31) ^ hash(self.success)
+    value = (value * 31) ^ hash(self.aze)
+    return value
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class getTopologyInfo_args:
   """
   Attributes:

http://git-wip-us.apache.org/repos/asf/storm/blob/897a6d2b/storm-core/src/storm.thrift
----------------------------------------------------------------------
diff --git a/storm-core/src/storm.thrift b/storm-core/src/storm.thrift
index 8e80d2a..700e5a0 100644
--- a/storm-core/src/storm.thrift
+++ b/storm-core/src/storm.thrift
@@ -667,6 +667,8 @@ service Nimbus {
   string getNimbusConf() throws (1: AuthorizationException aze);
   // stats functions
   ClusterSummary getClusterInfo() throws (1: AuthorizationException aze);
+  NimbusSummary getLeader() throws (1: AuthorizationException aze);
+  bool isTopologyNameAllowed(1: string name) throws (1: AuthorizationException aze);
   TopologyInfo getTopologyInfo(1: string id) throws (1: NotAliveException e, 2: AuthorizationException aze);
   TopologyInfo getTopologyInfoWithOpts(1: string id, 2: GetInfoOptions options) throws (1: NotAliveException e, 2: AuthorizationException aze);
   TopologyPageInfo getTopologyPageInfo(1: string id, 2: string window, 3: bool is_include_sys) throws (1: NotAliveException e, 2: AuthorizationException aze);