You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/08/24 15:52:07 UTC

[22/50] [abbrv] storm git commit: Adding nimbus summary info to zookeeper.

Adding nimbus summary info to zookeeper.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4502bffb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4502bffb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4502bffb

Branch: refs/heads/master
Commit: 4502bffbe3f9b4cd3674a56afbda1bb115cec239
Parents: 1b6491f
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Feb 12 11:27:50 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Feb 12 11:27:50 2015 -0800

----------------------------------------------------------------------
 storm-core/src/clj/backtype/storm/cluster.clj   |  24 +-
 storm-core/src/clj/backtype/storm/config.clj    |  10 +
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  27 +-
 storm-core/src/clj/backtype/storm/ui/core.clj   |  28 +-
 .../storm/generated/ClusterSummary.java         | 232 +++---
 .../backtype/storm/generated/NimbusSummary.java | 723 +++++++++++++++++++
 storm-core/src/py/storm/ttypes.py               | 577 +++++++++------
 storm-core/src/storm.thrift                     |  10 +-
 .../public/templates/index-page-template.html   |  26 +-
 9 files changed, 1297 insertions(+), 360 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 2c58510..3bf6628 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -146,6 +146,12 @@
   (code-distributor [this callback])
   ;returns lits of nimbusinfos under /stormroot/code-distributor/storm-id
   (code-distributor-info [this storm-id])
+
+  ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data>
+  (nimbuses [this])
+  ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
+  (add-nimbus-host! [this nimbus-id nimbus-summary])
+
   (active-storms [this])
   (storm-base [this storm-id callback])
   (get-worker-heartbeat [this storm-id node port])
@@ -180,14 +186,17 @@
 (def WORKERBEATS-ROOT "workerbeats")
 (def ERRORS-ROOT "errors")
 (def CODE-DISTRIBUTOR-ROOT "code-distributor")
+(def NIMBUSES-ROOT "nimbuses")
 (def CREDENTIALS-ROOT "credentials")
 
+
 (def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
 (def STORMS-SUBTREE (str "/" STORMS-ROOT))
 (def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
 (def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
 (def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
 (def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT))
+(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
 (def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
 
 (defn supervisor-path
@@ -202,6 +211,10 @@
   [id]
   (str CODE-DISTRIBUTOR-SUBTREE "/" id))
 
+(defn nimbus-path
+  [id]
+  (str NIMBUSES-SUBTREE "/" id))
+
 (defn storm-path
   [id]
   (str STORMS-SUBTREE "/" id))
@@ -292,7 +305,7 @@
                          CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
                          ;; this should never happen
                          (exit-process! 30 "Unknown callback for subtree " subtree args)))))]
-    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE]]
+    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE NIMBUSES-SUBTREE]]
       (mkdirs cluster-state p acls))
     (reify
       StormClusterState
@@ -330,6 +343,15 @@
           (reset! code-distributor-callback callback))
         (get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback)))
 
+      (nimbuses
+        [this]
+        (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) false))
+          (get-children cluster-state NIMBUSES-SUBTREE false)))
+
+      (add-nimbus-host!
+        [this nimbus-id nimbus-summary]
+        (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
+
       (code-distributor-info
         [this storm-id]
         (map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children cluster-state (code-distributor-path storm-id) false)))

http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index a6b160d..f3c70e5 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -282,3 +282,13 @@
 (defn ^LocalState worker-state
   [conf id]
   (LocalState. (worker-heartbeats-root conf id)))
+
+(defn read-storm-version
+  "Returns a string containing the Storm version or 'Unknown'."
+  []
+  (let [storm-home (System/getProperty "storm.home")
+        release-path (format "%s/RELEASE" storm-home)
+        release-file (File. release-path)]
+    (if (and (.exists release-file) (.isFile release-file))
+      (str/trim (slurp release-path))
+      "Unknown")))

http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index e354fab..52ee708 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -15,7 +15,8 @@
 ;; limitations under the License.
 (ns backtype.storm.daemon.nimbus
   (:import [java.nio ByteBuffer]
-           [java.util Collections])
+           [java.util Collections]
+           [backtype.storm.generated NimbusSummary])
   (:import [java.io FileNotFoundException])
   (:import [java.net InetAddress])
   (:import [java.nio.channels Channels WritableByteChannel])
@@ -104,6 +105,7 @@
      :id->sched-status (atom {})
      :cred-renewers (AuthUtils/GetCredentialRenewers conf)
      :nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
+     :nimbuses-cache (atom {}) ;;TODO need to figure out how to keep the cache upto date, one more thread
      }))
 
 (defn inbox [nimbus]
@@ -1030,6 +1032,17 @@
   (let [nimbus (nimbus-data conf inimbus)
        principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
     (.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
+
+    ;add to nimbuses
+    (.add-nimbus-host! (:storm-cluster-state nimbus)
+      (.toHostPortString (:nimbus-host-port-info nimbus))
+      {
+        :host (.getHost (:nimbus-host-port-info nimbus))
+        :port (.getPort (:nimbus-host-port-info nimbus))
+        :start-time-secs (current-time-secs)
+        :version (read-storm-version)
+        })
+
     (.addToLeaderLockQueue (:leader-elector nimbus))
     (cleanup-corrupt-topologies! nimbus)
     ;register call back for code-distributor
@@ -1287,8 +1300,14 @@
                                                                 (count (:used-ports info))
                                                                 id )
                                             ))
-              nimbus-uptime ((:uptime nimbus))
               bases (topology-bases storm-cluster-state)
+              nimbuses (.nimbuses storm-cluster-state)
+              nimbuses (map #(NimbusSummary. (:host %1) (:port %1) (time-delta (:start-time-secs %1))
+                               (let [leader (.getLeader (:leader-elector nimbus))]
+                                 (and (= (.getHost leader) (:host %1)) (= (.getPort leader) (:port %1))))
+                               (:version %1))
+                         nimbuses
+                         )
               topology-summaries (dofor [[id base] bases :when base]
 	                                  (let [assignment (.assignment-info storm-cluster-state id nil)
                                                 topo-summ (TopologySummary. id
@@ -1312,8 +1331,8 @@
                                                topo-summ
                                           ))]
           (ClusterSummary. supervisor-summaries
-                           nimbus-uptime
-                           topology-summaries)
+                           topology-summaries
+                           nimbuses)
           ))
       
       (^TopologyInfo getTopologyInfo [this ^String storm-id]

http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 3c7f578..94b0311 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -78,16 +78,6 @@
        (map #(.get_stats ^ExecutorSummary %))
        (filter not-nil?)))
 
-(defn read-storm-version
-  "Returns a string containing the Storm version or 'Unknown'."
-  []
-  (let [storm-home (System/getProperty "storm.home")
-        release-path (format "%s/RELEASE" storm-home)
-        release-file (File. release-path)]
-    (if (and (.exists release-file) (.isFile release-file))
-      (trim (slurp release-path))
-      "Unknown")))
-
 (defn component-type
   "Returns the component type (either :bolt or :spout) for a given
   topology and component id. Returns nil if not found."
@@ -520,7 +510,6 @@
                              (reduce +))]
        {"user" user
         "stormVersion" (read-storm-version)
-        "nimbusUptime" (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
         "supervisors" (count sups)
         "slotsTotal" total-slots
         "slotsUsed"  used-slots
@@ -530,18 +519,19 @@
 
 (defn nimbus-summary
   ([]
-    (let [leader-elector (zk-leader-elector *STORM-CONF*)
-          nimbus-hosts (.getAllNimbuses leader-elector)
-          no-op (.close leader-elector)]
-      (nimbus-summary nimbus-hosts)))
+    (with-nimbus nimbus
+      (nimbus-summary
+        (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus)))))
   ([nimbuses]
     {"nimbuses"
      (for [^NimbusInfo n nimbuses]
        {
-        "host" (.getHost n)
-        "port" (.getPort n)
-        "nimbusLogLink" (nimbus-log-link (.getHost n) (.getPort n))
-        "isLeader" (.isLeader n)})}))
+        "host" (.get_host n)
+        "port" (.get_port n)
+        "nimbusLogLink" (nimbus-log-link (.get_host n) (.get_port n))
+        "isLeader" (.is_isLeader n)
+        "version" (.get_version n)
+        "nimbusUpTime" (pretty-uptime-sec (.get_uptimeSecs n))})}))
 
 (defn supervisor-summary
   ([]

http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
index a2623ab..7e32c72 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
@@ -42,18 +42,18 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterSummary");
 
   private static final org.apache.thrift.protocol.TField SUPERVISORS_FIELD_DESC = new org.apache.thrift.protocol.TField("supervisors", org.apache.thrift.protocol.TType.LIST, (short)1);
-  private static final org.apache.thrift.protocol.TField NIMBUS_UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("nimbus_uptime_secs", org.apache.thrift.protocol.TType.I32, (short)2);
   private static final org.apache.thrift.protocol.TField TOPOLOGIES_FIELD_DESC = new org.apache.thrift.protocol.TField("topologies", org.apache.thrift.protocol.TType.LIST, (short)3);
+  private static final org.apache.thrift.protocol.TField NIMBUSES_FIELD_DESC = new org.apache.thrift.protocol.TField("nimbuses", org.apache.thrift.protocol.TType.LIST, (short)4);
 
   private List<SupervisorSummary> supervisors; // required
-  private int nimbus_uptime_secs; // required
   private List<TopologySummary> topologies; // required
+  private List<NimbusSummary> nimbuses; // required
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
     SUPERVISORS((short)1, "supervisors"),
-    NIMBUS_UPTIME_SECS((short)2, "nimbus_uptime_secs"),
-    TOPOLOGIES((short)3, "topologies");
+    TOPOLOGIES((short)3, "topologies"),
+    NIMBUSES((short)4, "nimbuses");
 
     private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
 
@@ -70,10 +70,10 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
       switch(fieldId) {
         case 1: // SUPERVISORS
           return SUPERVISORS;
-        case 2: // NIMBUS_UPTIME_SECS
-          return NIMBUS_UPTIME_SECS;
         case 3: // TOPOLOGIES
           return TOPOLOGIES;
+        case 4: // NIMBUSES
+          return NIMBUSES;
         default:
           return null;
       }
@@ -114,8 +114,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
   }
 
   // isset id assignments
-  private static final int __NIMBUS_UPTIME_SECS_ISSET_ID = 0;
-  private BitSet __isset_bit_vector = new BitSet(1);
 
   public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
@@ -123,11 +121,12 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
     tmpMap.put(_Fields.SUPERVISORS, new org.apache.thrift.meta_data.FieldMetaData("supervisors", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, SupervisorSummary.class))));
-    tmpMap.put(_Fields.NIMBUS_UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("nimbus_uptime_secs", org.apache.thrift.TFieldRequirementType.REQUIRED, 
-        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
     tmpMap.put(_Fields.TOPOLOGIES, new org.apache.thrift.meta_data.FieldMetaData("topologies", org.apache.thrift.TFieldRequirementType.REQUIRED, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
             new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TopologySummary.class))));
+    tmpMap.put(_Fields.NIMBUSES, new org.apache.thrift.meta_data.FieldMetaData("nimbuses", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
+            new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NimbusSummary.class))));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterSummary.class, metaDataMap);
   }
@@ -137,22 +136,19 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
 
   public ClusterSummary(
     List<SupervisorSummary> supervisors,
-    int nimbus_uptime_secs,
-    List<TopologySummary> topologies)
+    List<TopologySummary> topologies,
+    List<NimbusSummary> nimbuses)
   {
     this();
     this.supervisors = supervisors;
-    this.nimbus_uptime_secs = nimbus_uptime_secs;
-    set_nimbus_uptime_secs_isSet(true);
     this.topologies = topologies;
+    this.nimbuses = nimbuses;
   }
 
   /**
    * Performs a deep copy on <i>other</i>.
    */
   public ClusterSummary(ClusterSummary other) {
-    __isset_bit_vector.clear();
-    __isset_bit_vector.or(other.__isset_bit_vector);
     if (other.is_set_supervisors()) {
       List<SupervisorSummary> __this__supervisors = new ArrayList<SupervisorSummary>();
       for (SupervisorSummary other_element : other.supervisors) {
@@ -160,7 +156,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
       }
       this.supervisors = __this__supervisors;
     }
-    this.nimbus_uptime_secs = other.nimbus_uptime_secs;
     if (other.is_set_topologies()) {
       List<TopologySummary> __this__topologies = new ArrayList<TopologySummary>();
       for (TopologySummary other_element : other.topologies) {
@@ -168,6 +163,13 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
       }
       this.topologies = __this__topologies;
     }
+    if (other.is_set_nimbuses()) {
+      List<NimbusSummary> __this__nimbuses = new ArrayList<NimbusSummary>();
+      for (NimbusSummary other_element : other.nimbuses) {
+        __this__nimbuses.add(new NimbusSummary(other_element));
+      }
+      this.nimbuses = __this__nimbuses;
+    }
   }
 
   public ClusterSummary deepCopy() {
@@ -177,9 +179,8 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
   @Override
   public void clear() {
     this.supervisors = null;
-    set_nimbus_uptime_secs_isSet(false);
-    this.nimbus_uptime_secs = 0;
     this.topologies = null;
+    this.nimbuses = null;
   }
 
   public int get_supervisors_size() {
@@ -220,28 +221,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
     }
   }
 
-  public int get_nimbus_uptime_secs() {
-    return this.nimbus_uptime_secs;
-  }
-
-  public void set_nimbus_uptime_secs(int nimbus_uptime_secs) {
-    this.nimbus_uptime_secs = nimbus_uptime_secs;
-    set_nimbus_uptime_secs_isSet(true);
-  }
-
-  public void unset_nimbus_uptime_secs() {
-    __isset_bit_vector.clear(__NIMBUS_UPTIME_SECS_ISSET_ID);
-  }
-
-  /** Returns true if field nimbus_uptime_secs is set (has been assigned a value) and false otherwise */
-  public boolean is_set_nimbus_uptime_secs() {
-    return __isset_bit_vector.get(__NIMBUS_UPTIME_SECS_ISSET_ID);
-  }
-
-  public void set_nimbus_uptime_secs_isSet(boolean value) {
-    __isset_bit_vector.set(__NIMBUS_UPTIME_SECS_ISSET_ID, value);
-  }
-
   public int get_topologies_size() {
     return (this.topologies == null) ? 0 : this.topologies.size();
   }
@@ -280,6 +259,44 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
     }
   }
 
+  public int get_nimbuses_size() {
+    return (this.nimbuses == null) ? 0 : this.nimbuses.size();
+  }
+
+  public java.util.Iterator<NimbusSummary> get_nimbuses_iterator() {
+    return (this.nimbuses == null) ? null : this.nimbuses.iterator();
+  }
+
+  public void add_to_nimbuses(NimbusSummary elem) {
+    if (this.nimbuses == null) {
+      this.nimbuses = new ArrayList<NimbusSummary>();
+    }
+    this.nimbuses.add(elem);
+  }
+
+  public List<NimbusSummary> get_nimbuses() {
+    return this.nimbuses;
+  }
+
+  public void set_nimbuses(List<NimbusSummary> nimbuses) {
+    this.nimbuses = nimbuses;
+  }
+
+  public void unset_nimbuses() {
+    this.nimbuses = null;
+  }
+
+  /** Returns true if field nimbuses is set (has been assigned a value) and false otherwise */
+  public boolean is_set_nimbuses() {
+    return this.nimbuses != null;
+  }
+
+  public void set_nimbuses_isSet(boolean value) {
+    if (!value) {
+      this.nimbuses = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, Object value) {
     switch (field) {
     case SUPERVISORS:
@@ -290,19 +307,19 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
       }
       break;
 
-    case NIMBUS_UPTIME_SECS:
+    case TOPOLOGIES:
       if (value == null) {
-        unset_nimbus_uptime_secs();
+        unset_topologies();
       } else {
-        set_nimbus_uptime_secs((Integer)value);
+        set_topologies((List<TopologySummary>)value);
       }
       break;
 
-    case TOPOLOGIES:
+    case NIMBUSES:
       if (value == null) {
-        unset_topologies();
+        unset_nimbuses();
       } else {
-        set_topologies((List<TopologySummary>)value);
+        set_nimbuses((List<NimbusSummary>)value);
       }
       break;
 
@@ -314,12 +331,12 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
     case SUPERVISORS:
       return get_supervisors();
 
-    case NIMBUS_UPTIME_SECS:
-      return Integer.valueOf(get_nimbus_uptime_secs());
-
     case TOPOLOGIES:
       return get_topologies();
 
+    case NIMBUSES:
+      return get_nimbuses();
+
     }
     throw new IllegalStateException();
   }
@@ -333,10 +350,10 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
     switch (field) {
     case SUPERVISORS:
       return is_set_supervisors();
-    case NIMBUS_UPTIME_SECS:
-      return is_set_nimbus_uptime_secs();
     case TOPOLOGIES:
       return is_set_topologies();
+    case NIMBUSES:
+      return is_set_nimbuses();
     }
     throw new IllegalStateException();
   }
@@ -363,15 +380,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
         return false;
     }
 
-    boolean this_present_nimbus_uptime_secs = true;
-    boolean that_present_nimbus_uptime_secs = true;
-    if (this_present_nimbus_uptime_secs || that_present_nimbus_uptime_secs) {
-      if (!(this_present_nimbus_uptime_secs && that_present_nimbus_uptime_secs))
-        return false;
-      if (this.nimbus_uptime_secs != that.nimbus_uptime_secs)
-        return false;
-    }
-
     boolean this_present_topologies = true && this.is_set_topologies();
     boolean that_present_topologies = true && that.is_set_topologies();
     if (this_present_topologies || that_present_topologies) {
@@ -381,6 +389,15 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
         return false;
     }
 
+    boolean this_present_nimbuses = true && this.is_set_nimbuses();
+    boolean that_present_nimbuses = true && that.is_set_nimbuses();
+    if (this_present_nimbuses || that_present_nimbuses) {
+      if (!(this_present_nimbuses && that_present_nimbuses))
+        return false;
+      if (!this.nimbuses.equals(that.nimbuses))
+        return false;
+    }
+
     return true;
   }
 
@@ -393,16 +410,16 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
     if (present_supervisors)
       builder.append(supervisors);
 
-    boolean present_nimbus_uptime_secs = true;
-    builder.append(present_nimbus_uptime_secs);
-    if (present_nimbus_uptime_secs)
-      builder.append(nimbus_uptime_secs);
-
     boolean present_topologies = true && (is_set_topologies());
     builder.append(present_topologies);
     if (present_topologies)
       builder.append(topologies);
 
+    boolean present_nimbuses = true && (is_set_nimbuses());
+    builder.append(present_nimbuses);
+    if (present_nimbuses)
+      builder.append(nimbuses);
+
     return builder.toHashCode();
   }
 
@@ -424,22 +441,22 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
         return lastComparison;
       }
     }
-    lastComparison = Boolean.valueOf(is_set_nimbus_uptime_secs()).compareTo(typedOther.is_set_nimbus_uptime_secs());
+    lastComparison = Boolean.valueOf(is_set_topologies()).compareTo(typedOther.is_set_topologies());
     if (lastComparison != 0) {
       return lastComparison;
     }
-    if (is_set_nimbus_uptime_secs()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nimbus_uptime_secs, typedOther.nimbus_uptime_secs);
+    if (is_set_topologies()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topologies, typedOther.topologies);
       if (lastComparison != 0) {
         return lastComparison;
       }
     }
-    lastComparison = Boolean.valueOf(is_set_topologies()).compareTo(typedOther.is_set_topologies());
+    lastComparison = Boolean.valueOf(is_set_nimbuses()).compareTo(typedOther.is_set_nimbuses());
     if (lastComparison != 0) {
       return lastComparison;
     }
-    if (is_set_topologies()) {
-      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topologies, typedOther.topologies);
+    if (is_set_nimbuses()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nimbuses, typedOther.nimbuses);
       if (lastComparison != 0) {
         return lastComparison;
       }
@@ -479,14 +496,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
           }
           break;
-        case 2: // NIMBUS_UPTIME_SECS
-          if (field.type == org.apache.thrift.protocol.TType.I32) {
-            this.nimbus_uptime_secs = iprot.readI32();
-            set_nimbus_uptime_secs_isSet(true);
-          } else { 
-            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
-          }
-          break;
         case 3: // TOPOLOGIES
           if (field.type == org.apache.thrift.protocol.TType.LIST) {
             {
@@ -505,6 +514,24 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
           }
           break;
+        case 4: // NIMBUSES
+          if (field.type == org.apache.thrift.protocol.TType.LIST) {
+            {
+              org.apache.thrift.protocol.TList _list43 = iprot.readListBegin();
+              this.nimbuses = new ArrayList<NimbusSummary>(_list43.size);
+              for (int _i44 = 0; _i44 < _list43.size; ++_i44)
+              {
+                NimbusSummary _elem45; // required
+                _elem45 = new NimbusSummary();
+                _elem45.read(iprot);
+                this.nimbuses.add(_elem45);
+              }
+              iprot.readListEnd();
+            }
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
         default:
           org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
       }
@@ -522,24 +549,33 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
       oprot.writeFieldBegin(SUPERVISORS_FIELD_DESC);
       {
         oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.supervisors.size()));
-        for (SupervisorSummary _iter43 : this.supervisors)
+        for (SupervisorSummary _iter46 : this.supervisors)
         {
-          _iter43.write(oprot);
+          _iter46.write(oprot);
         }
         oprot.writeListEnd();
       }
       oprot.writeFieldEnd();
     }
-    oprot.writeFieldBegin(NIMBUS_UPTIME_SECS_FIELD_DESC);
-    oprot.writeI32(this.nimbus_uptime_secs);
-    oprot.writeFieldEnd();
     if (this.topologies != null) {
       oprot.writeFieldBegin(TOPOLOGIES_FIELD_DESC);
       {
         oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.topologies.size()));
-        for (TopologySummary _iter44 : this.topologies)
+        for (TopologySummary _iter47 : this.topologies)
+        {
+          _iter47.write(oprot);
+        }
+        oprot.writeListEnd();
+      }
+      oprot.writeFieldEnd();
+    }
+    if (this.nimbuses != null) {
+      oprot.writeFieldBegin(NIMBUSES_FIELD_DESC);
+      {
+        oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.nimbuses.size()));
+        for (NimbusSummary _iter48 : this.nimbuses)
         {
-          _iter44.write(oprot);
+          _iter48.write(oprot);
         }
         oprot.writeListEnd();
       }
@@ -562,10 +598,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
     }
     first = false;
     if (!first) sb.append(", ");
-    sb.append("nimbus_uptime_secs:");
-    sb.append(this.nimbus_uptime_secs);
-    first = false;
-    if (!first) sb.append(", ");
     sb.append("topologies:");
     if (this.topologies == null) {
       sb.append("null");
@@ -573,6 +605,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
       sb.append(this.topologies);
     }
     first = false;
+    if (!first) sb.append(", ");
+    sb.append("nimbuses:");
+    if (this.nimbuses == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.nimbuses);
+    }
+    first = false;
     sb.append(")");
     return sb.toString();
   }
@@ -583,14 +623,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
       throw new org.apache.thrift.protocol.TProtocolException("Required field 'supervisors' is unset! Struct:" + toString());
     }
 
-    if (!is_set_nimbus_uptime_secs()) {
-      throw new org.apache.thrift.protocol.TProtocolException("Required field 'nimbus_uptime_secs' is unset! Struct:" + toString());
-    }
-
     if (!is_set_topologies()) {
       throw new org.apache.thrift.protocol.TProtocolException("Required field 'topologies' is unset! Struct:" + toString());
     }
 
+    if (!is_set_nimbuses()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'nimbuses' is unset! Struct:" + toString());
+    }
+
   }
 
   private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -603,8 +643,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
 
   private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
     try {
-      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
-      __isset_bit_vector = new BitSet(1);
       read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
     } catch (org.apache.thrift.TException te) {
       throw new java.io.IOException(te);

http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java b/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java
new file mode 100644
index 0000000..195048a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java
@@ -0,0 +1,723 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NimbusSummary implements org.apache.thrift.TBase<NimbusSummary, NimbusSummary._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NimbusSummary");
+
+  private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
+  private static final org.apache.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("uptimeSecs", org.apache.thrift.protocol.TType.I32, (short)3);
+  private static final org.apache.thrift.protocol.TField IS_LEADER_FIELD_DESC = new org.apache.thrift.protocol.TField("isLeader", org.apache.thrift.protocol.TType.BOOL, (short)4);
+  private static final org.apache.thrift.protocol.TField VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("version", org.apache.thrift.protocol.TType.STRING, (short)5);
+
+  private String host; // required
+  private int port; // required
+  private int uptimeSecs; // required
+  private boolean isLeader; // required
+  private String version; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    HOST((short)1, "host"),
+    PORT((short)2, "port"),
+    UPTIME_SECS((short)3, "uptimeSecs"),
+    IS_LEADER((short)4, "isLeader"),
+    VERSION((short)5, "version");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // HOST
+          return HOST;
+        case 2: // PORT
+          return PORT;
+        case 3: // UPTIME_SECS
+          return UPTIME_SECS;
+        case 4: // IS_LEADER
+          return IS_LEADER;
+        case 5: // VERSION
+          return VERSION;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+  private static final int __PORT_ISSET_ID = 0;
+  private static final int __UPTIMESECS_ISSET_ID = 1;
+  private static final int __ISLEADER_ISSET_ID = 2;
+  private BitSet __isset_bit_vector = new BitSet(3);
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("uptimeSecs", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+    tmpMap.put(_Fields.IS_LEADER, new org.apache.thrift.meta_data.FieldMetaData("isLeader", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+    tmpMap.put(_Fields.VERSION, new org.apache.thrift.meta_data.FieldMetaData("version", org.apache.thrift.TFieldRequirementType.REQUIRED, 
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NimbusSummary.class, metaDataMap);
+  }
+
+  public NimbusSummary() {
+  }
+
+  public NimbusSummary(
+    String host,
+    int port,
+    int uptimeSecs,
+    boolean isLeader,
+    String version)
+  {
+    this();
+    this.host = host;
+    this.port = port;
+    set_port_isSet(true);
+    this.uptimeSecs = uptimeSecs;
+    set_uptimeSecs_isSet(true);
+    this.isLeader = isLeader;
+    set_isLeader_isSet(true);
+    this.version = version;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public NimbusSummary(NimbusSummary other) {
+    __isset_bit_vector.clear();
+    __isset_bit_vector.or(other.__isset_bit_vector);
+    if (other.is_set_host()) {
+      this.host = other.host;
+    }
+    this.port = other.port;
+    this.uptimeSecs = other.uptimeSecs;
+    this.isLeader = other.isLeader;
+    if (other.is_set_version()) {
+      this.version = other.version;
+    }
+  }
+
+  public NimbusSummary deepCopy() {
+    return new NimbusSummary(this);
+  }
+
+  @Override
+  public void clear() {
+    this.host = null;
+    set_port_isSet(false);
+    this.port = 0;
+    set_uptimeSecs_isSet(false);
+    this.uptimeSecs = 0;
+    set_isLeader_isSet(false);
+    this.isLeader = false;
+    this.version = null;
+  }
+
+  public String get_host() {
+    return this.host;
+  }
+
+  public void set_host(String host) {
+    this.host = host;
+  }
+
+  public void unset_host() {
+    this.host = null;
+  }
+
+  /** Returns true if field host is set (has been assigned a value) and false otherwise */
+  public boolean is_set_host() {
+    return this.host != null;
+  }
+
+  public void set_host_isSet(boolean value) {
+    if (!value) {
+      this.host = null;
+    }
+  }
+
+  public int get_port() {
+    return this.port;
+  }
+
+  public void set_port(int port) {
+    this.port = port;
+    set_port_isSet(true);
+  }
+
+  public void unset_port() {
+    __isset_bit_vector.clear(__PORT_ISSET_ID);
+  }
+
+  /** Returns true if field port is set (has been assigned a value) and false otherwise */
+  public boolean is_set_port() {
+    return __isset_bit_vector.get(__PORT_ISSET_ID);
+  }
+
+  public void set_port_isSet(boolean value) {
+    __isset_bit_vector.set(__PORT_ISSET_ID, value);
+  }
+
+  public int get_uptimeSecs() {
+    return this.uptimeSecs;
+  }
+
+  public void set_uptimeSecs(int uptimeSecs) {
+    this.uptimeSecs = uptimeSecs;
+    set_uptimeSecs_isSet(true);
+  }
+
+  public void unset_uptimeSecs() {
+    __isset_bit_vector.clear(__UPTIMESECS_ISSET_ID);
+  }
+
+  /** Returns true if field uptimeSecs is set (has been assigned a value) and false otherwise */
+  public boolean is_set_uptimeSecs() {
+    return __isset_bit_vector.get(__UPTIMESECS_ISSET_ID);
+  }
+
+  public void set_uptimeSecs_isSet(boolean value) {
+    __isset_bit_vector.set(__UPTIMESECS_ISSET_ID, value);
+  }
+
+  public boolean is_isLeader() {
+    return this.isLeader;
+  }
+
+  public void set_isLeader(boolean isLeader) {
+    this.isLeader = isLeader;
+    set_isLeader_isSet(true);
+  }
+
+  public void unset_isLeader() {
+    __isset_bit_vector.clear(__ISLEADER_ISSET_ID);
+  }
+
+  /** Returns true if field isLeader is set (has been assigned a value) and false otherwise */
+  public boolean is_set_isLeader() {
+    return __isset_bit_vector.get(__ISLEADER_ISSET_ID);
+  }
+
+  public void set_isLeader_isSet(boolean value) {
+    __isset_bit_vector.set(__ISLEADER_ISSET_ID, value);
+  }
+
+  public String get_version() {
+    return this.version;
+  }
+
+  public void set_version(String version) {
+    this.version = version;
+  }
+
+  public void unset_version() {
+    this.version = null;
+  }
+
+  /** Returns true if field version is set (has been assigned a value) and false otherwise */
+  public boolean is_set_version() {
+    return this.version != null;
+  }
+
+  public void set_version_isSet(boolean value) {
+    if (!value) {
+      this.version = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case HOST:
+      if (value == null) {
+        unset_host();
+      } else {
+        set_host((String)value);
+      }
+      break;
+
+    case PORT:
+      if (value == null) {
+        unset_port();
+      } else {
+        set_port((Integer)value);
+      }
+      break;
+
+    case UPTIME_SECS:
+      if (value == null) {
+        unset_uptimeSecs();
+      } else {
+        set_uptimeSecs((Integer)value);
+      }
+      break;
+
+    case IS_LEADER:
+      if (value == null) {
+        unset_isLeader();
+      } else {
+        set_isLeader((Boolean)value);
+      }
+      break;
+
+    case VERSION:
+      if (value == null) {
+        unset_version();
+      } else {
+        set_version((String)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case HOST:
+      return get_host();
+
+    case PORT:
+      return Integer.valueOf(get_port());
+
+    case UPTIME_SECS:
+      return Integer.valueOf(get_uptimeSecs());
+
+    case IS_LEADER:
+      return Boolean.valueOf(is_isLeader());
+
+    case VERSION:
+      return get_version();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case HOST:
+      return is_set_host();
+    case PORT:
+      return is_set_port();
+    case UPTIME_SECS:
+      return is_set_uptimeSecs();
+    case IS_LEADER:
+      return is_set_isLeader();
+    case VERSION:
+      return is_set_version();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof NimbusSummary)
+      return this.equals((NimbusSummary)that);
+    return false;
+  }
+
+  public boolean equals(NimbusSummary that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_host = true && this.is_set_host();
+    boolean that_present_host = true && that.is_set_host();
+    if (this_present_host || that_present_host) {
+      if (!(this_present_host && that_present_host))
+        return false;
+      if (!this.host.equals(that.host))
+        return false;
+    }
+
+    boolean this_present_port = true;
+    boolean that_present_port = true;
+    if (this_present_port || that_present_port) {
+      if (!(this_present_port && that_present_port))
+        return false;
+      if (this.port != that.port)
+        return false;
+    }
+
+    boolean this_present_uptimeSecs = true;
+    boolean that_present_uptimeSecs = true;
+    if (this_present_uptimeSecs || that_present_uptimeSecs) {
+      if (!(this_present_uptimeSecs && that_present_uptimeSecs))
+        return false;
+      if (this.uptimeSecs != that.uptimeSecs)
+        return false;
+    }
+
+    boolean this_present_isLeader = true;
+    boolean that_present_isLeader = true;
+    if (this_present_isLeader || that_present_isLeader) {
+      if (!(this_present_isLeader && that_present_isLeader))
+        return false;
+      if (this.isLeader != that.isLeader)
+        return false;
+    }
+
+    boolean this_present_version = true && this.is_set_version();
+    boolean that_present_version = true && that.is_set_version();
+    if (this_present_version || that_present_version) {
+      if (!(this_present_version && that_present_version))
+        return false;
+      if (!this.version.equals(that.version))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+
+    boolean present_host = true && (is_set_host());
+    builder.append(present_host);
+    if (present_host)
+      builder.append(host);
+
+    boolean present_port = true;
+    builder.append(present_port);
+    if (present_port)
+      builder.append(port);
+
+    boolean present_uptimeSecs = true;
+    builder.append(present_uptimeSecs);
+    if (present_uptimeSecs)
+      builder.append(uptimeSecs);
+
+    boolean present_isLeader = true;
+    builder.append(present_isLeader);
+    if (present_isLeader)
+      builder.append(isLeader);
+
+    boolean present_version = true && (is_set_version());
+    builder.append(present_version);
+    if (present_version)
+      builder.append(version);
+
+    return builder.toHashCode();
+  }
+
+  public int compareTo(NimbusSummary other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    NimbusSummary typedOther = (NimbusSummary)other;
+
+    lastComparison = Boolean.valueOf(is_set_host()).compareTo(typedOther.is_set_host());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_host()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, typedOther.host);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_port()).compareTo(typedOther.is_set_port());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_port()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, typedOther.port);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_uptimeSecs()).compareTo(typedOther.is_set_uptimeSecs());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_uptimeSecs()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.uptimeSecs, typedOther.uptimeSecs);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_isLeader()).compareTo(typedOther.is_set_isLeader());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_isLeader()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.isLeader, typedOther.isLeader);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_version()).compareTo(typedOther.is_set_version());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_version()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.version, typedOther.version);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) { 
+        break;
+      }
+      switch (field.id) {
+        case 1: // HOST
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.host = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2: // PORT
+          if (field.type == org.apache.thrift.protocol.TType.I32) {
+            this.port = iprot.readI32();
+            set_port_isSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3: // UPTIME_SECS
+          if (field.type == org.apache.thrift.protocol.TType.I32) {
+            this.uptimeSecs = iprot.readI32();
+            set_uptimeSecs_isSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 4: // IS_LEADER
+          if (field.type == org.apache.thrift.protocol.TType.BOOL) {
+            this.isLeader = iprot.readBool();
+            set_isLeader_isSet(true);
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 5: // VERSION
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.version = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    if (this.host != null) {
+      oprot.writeFieldBegin(HOST_FIELD_DESC);
+      oprot.writeString(this.host);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldBegin(PORT_FIELD_DESC);
+    oprot.writeI32(this.port);
+    oprot.writeFieldEnd();
+    oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC);
+    oprot.writeI32(this.uptimeSecs);
+    oprot.writeFieldEnd();
+    oprot.writeFieldBegin(IS_LEADER_FIELD_DESC);
+    oprot.writeBool(this.isLeader);
+    oprot.writeFieldEnd();
+    if (this.version != null) {
+      oprot.writeFieldBegin(VERSION_FIELD_DESC);
+      oprot.writeString(this.version);
+      oprot.writeFieldEnd();
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("NimbusSummary(");
+    boolean first = true;
+
+    sb.append("host:");
+    if (this.host == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.host);
+    }
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("port:");
+    sb.append(this.port);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("uptimeSecs:");
+    sb.append(this.uptimeSecs);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("isLeader:");
+    sb.append(this.isLeader);
+    first = false;
+    if (!first) sb.append(", ");
+    sb.append("version:");
+    if (this.version == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.version);
+    }
+    first = false;
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_host()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'host' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_port()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_uptimeSecs()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'uptimeSecs' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_isLeader()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'isLeader' is unset! Struct:" + toString());
+    }
+
+    if (!is_set_version()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'version' is unset! Struct:" + toString());
+    }
+
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+      __isset_bit_vector = new BitSet(1);
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index e4fb751..a5e155c 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -2354,28 +2354,150 @@ class SupervisorSummary:
   def __ne__(self, other):
     return not (self == other)
 
+class NimbusSummary:
+  """
+  Attributes:
+   - host
+   - port
+   - uptimeSecs
+   - isLeader
+   - version
+  """
+
+  thrift_spec = (
+    None, # 0
+    (1, TType.STRING, 'host', None, None, ), # 1
+    (2, TType.I32, 'port', None, None, ), # 2
+    (3, TType.I32, 'uptimeSecs', None, None, ), # 3
+    (4, TType.BOOL, 'isLeader', None, None, ), # 4
+    (5, TType.STRING, 'version', None, None, ), # 5
+  )
+
+  def __hash__(self):
+    return 0 + hash(self.host) + hash(self.port) + hash(self.uptimeSecs) + hash(self.isLeader) + hash(self.version)
+
+  def __init__(self, host=None, port=None, uptimeSecs=None, isLeader=None, version=None,):
+    self.host = host
+    self.port = port
+    self.uptimeSecs = uptimeSecs
+    self.isLeader = isLeader
+    self.version = version
+
+  def read(self, iprot):
+    if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+      fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+      return
+    iprot.readStructBegin()
+    while True:
+      (fname, ftype, fid) = iprot.readFieldBegin()
+      if ftype == TType.STOP:
+        break
+      if fid == 1:
+        if ftype == TType.STRING:
+          self.host = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      elif fid == 2:
+        if ftype == TType.I32:
+          self.port = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      elif fid == 3:
+        if ftype == TType.I32:
+          self.uptimeSecs = iprot.readI32();
+        else:
+          iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.BOOL:
+          self.isLeader = iprot.readBool();
+        else:
+          iprot.skip(ftype)
+      elif fid == 5:
+        if ftype == TType.STRING:
+          self.version = iprot.readString().decode('utf-8')
+        else:
+          iprot.skip(ftype)
+      else:
+        iprot.skip(ftype)
+      iprot.readFieldEnd()
+    iprot.readStructEnd()
+
+  def write(self, oprot):
+    if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+      oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+      return
+    oprot.writeStructBegin('NimbusSummary')
+    if self.host is not None:
+      oprot.writeFieldBegin('host', TType.STRING, 1)
+      oprot.writeString(self.host.encode('utf-8'))
+      oprot.writeFieldEnd()
+    if self.port is not None:
+      oprot.writeFieldBegin('port', TType.I32, 2)
+      oprot.writeI32(self.port)
+      oprot.writeFieldEnd()
+    if self.uptimeSecs is not None:
+      oprot.writeFieldBegin('uptimeSecs', TType.I32, 3)
+      oprot.writeI32(self.uptimeSecs)
+      oprot.writeFieldEnd()
+    if self.isLeader is not None:
+      oprot.writeFieldBegin('isLeader', TType.BOOL, 4)
+      oprot.writeBool(self.isLeader)
+      oprot.writeFieldEnd()
+    if self.version is not None:
+      oprot.writeFieldBegin('version', TType.STRING, 5)
+      oprot.writeString(self.version.encode('utf-8'))
+      oprot.writeFieldEnd()
+    oprot.writeFieldStop()
+    oprot.writeStructEnd()
+
+  def validate(self):
+    if self.host is None:
+      raise TProtocol.TProtocolException(message='Required field host is unset!')
+    if self.port is None:
+      raise TProtocol.TProtocolException(message='Required field port is unset!')
+    if self.uptimeSecs is None:
+      raise TProtocol.TProtocolException(message='Required field uptimeSecs is unset!')
+    if self.isLeader is None:
+      raise TProtocol.TProtocolException(message='Required field isLeader is unset!')
+    if self.version is None:
+      raise TProtocol.TProtocolException(message='Required field version is unset!')
+    return
+
+
+  def __repr__(self):
+    L = ['%s=%r' % (key, value)
+      for key, value in self.__dict__.iteritems()]
+    return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+  def __eq__(self, other):
+    return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+  def __ne__(self, other):
+    return not (self == other)
+
 class ClusterSummary:
   """
   Attributes:
    - supervisors
-   - nimbus_uptime_secs
    - topologies
+   - nimbuses
   """
 
   thrift_spec = (
     None, # 0
     (1, TType.LIST, 'supervisors', (TType.STRUCT,(SupervisorSummary, SupervisorSummary.thrift_spec)), None, ), # 1
-    (2, TType.I32, 'nimbus_uptime_secs', None, None, ), # 2
+    None, # 2
     (3, TType.LIST, 'topologies', (TType.STRUCT,(TopologySummary, TopologySummary.thrift_spec)), None, ), # 3
+    (4, TType.LIST, 'nimbuses', (TType.STRUCT,(NimbusSummary, NimbusSummary.thrift_spec)), None, ), # 4
   )
 
   def __hash__(self):
-    return 0 + hash(self.supervisors) + hash(self.nimbus_uptime_secs) + hash(self.topologies)
+    return 0 + hash(self.supervisors) + hash(self.topologies) + hash(self.nimbuses)
 
-  def __init__(self, supervisors=None, nimbus_uptime_secs=None, topologies=None,):
+  def __init__(self, supervisors=None, topologies=None, nimbuses=None,):
     self.supervisors = supervisors
-    self.nimbus_uptime_secs = nimbus_uptime_secs
     self.topologies = topologies
+    self.nimbuses = nimbuses
 
   def read(self, iprot):
     if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -2397,11 +2519,6 @@ class ClusterSummary:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
-      elif fid == 2:
-        if ftype == TType.I32:
-          self.nimbus_uptime_secs = iprot.readI32();
-        else:
-          iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.LIST:
           self.topologies = []
@@ -2413,6 +2530,17 @@ class ClusterSummary:
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
+      elif fid == 4:
+        if ftype == TType.LIST:
+          self.nimbuses = []
+          (_etype81, _size78) = iprot.readListBegin()
+          for _i82 in xrange(_size78):
+            _elem83 = NimbusSummary()
+            _elem83.read(iprot)
+            self.nimbuses.append(_elem83)
+          iprot.readListEnd()
+        else:
+          iprot.skip(ftype)
       else:
         iprot.skip(ftype)
       iprot.readFieldEnd()
@@ -2426,19 +2554,22 @@ class ClusterSummary:
     if self.supervisors is not None:
       oprot.writeFieldBegin('supervisors', TType.LIST, 1)
       oprot.writeListBegin(TType.STRUCT, len(self.supervisors))
-      for iter78 in self.supervisors:
-        iter78.write(oprot)
+      for iter84 in self.supervisors:
+        iter84.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
-    if self.nimbus_uptime_secs is not None:
-      oprot.writeFieldBegin('nimbus_uptime_secs', TType.I32, 2)
-      oprot.writeI32(self.nimbus_uptime_secs)
-      oprot.writeFieldEnd()
     if self.topologies is not None:
       oprot.writeFieldBegin('topologies', TType.LIST, 3)
       oprot.writeListBegin(TType.STRUCT, len(self.topologies))
-      for iter79 in self.topologies:
-        iter79.write(oprot)
+      for iter85 in self.topologies:
+        iter85.write(oprot)
+      oprot.writeListEnd()
+      oprot.writeFieldEnd()
+    if self.nimbuses is not None:
+      oprot.writeFieldBegin('nimbuses', TType.LIST, 4)
+      oprot.writeListBegin(TType.STRUCT, len(self.nimbuses))
+      for iter86 in self.nimbuses:
+        iter86.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -2447,10 +2578,10 @@ class ClusterSummary:
   def validate(self):
     if self.supervisors is None:
       raise TProtocol.TProtocolException(message='Required field supervisors is unset!')
-    if self.nimbus_uptime_secs is None:
-      raise TProtocol.TProtocolException(message='Required field nimbus_uptime_secs is unset!')
     if self.topologies is None:
       raise TProtocol.TProtocolException(message='Required field topologies is unset!')
+    if self.nimbuses is None:
+      raise TProtocol.TProtocolException(message='Required field nimbuses is unset!')
     return
 
 
@@ -2609,90 +2740,90 @@ class BoltStats:
       if fid == 1:
         if ftype == TType.MAP:
           self.acked = {}
-          (_ktype81, _vtype82, _size80 ) = iprot.readMapBegin() 
-          for _i84 in xrange(_size80):
-            _key85 = iprot.readString().decode('utf-8')
-            _val86 = {}
-            (_ktype88, _vtype89, _size87 ) = iprot.readMapBegin() 
-            for _i91 in xrange(_size87):
-              _key92 = GlobalStreamId()
-              _key92.read(iprot)
-              _val93 = iprot.readI64();
-              _val86[_key92] = _val93
+          (_ktype88, _vtype89, _size87 ) = iprot.readMapBegin() 
+          for _i91 in xrange(_size87):
+            _key92 = iprot.readString().decode('utf-8')
+            _val93 = {}
+            (_ktype95, _vtype96, _size94 ) = iprot.readMapBegin() 
+            for _i98 in xrange(_size94):
+              _key99 = GlobalStreamId()
+              _key99.read(iprot)
+              _val100 = iprot.readI64();
+              _val93[_key99] = _val100
             iprot.readMapEnd()
-            self.acked[_key85] = _val86
+            self.acked[_key92] = _val93
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.MAP:
           self.failed = {}
-          (_ktype95, _vtype96, _size94 ) = iprot.readMapBegin() 
-          for _i98 in xrange(_size94):
-            _key99 = iprot.readString().decode('utf-8')
-            _val100 = {}
-            (_ktype102, _vtype103, _size101 ) = iprot.readMapBegin() 
-            for _i105 in xrange(_size101):
-              _key106 = GlobalStreamId()
-              _key106.read(iprot)
-              _val107 = iprot.readI64();
-              _val100[_key106] = _val107
+          (_ktype102, _vtype103, _size101 ) = iprot.readMapBegin() 
+          for _i105 in xrange(_size101):
+            _key106 = iprot.readString().decode('utf-8')
+            _val107 = {}
+            (_ktype109, _vtype110, _size108 ) = iprot.readMapBegin() 
+            for _i112 in xrange(_size108):
+              _key113 = GlobalStreamId()
+              _key113.read(iprot)
+              _val114 = iprot.readI64();
+              _val107[_key113] = _val114
             iprot.readMapEnd()
-            self.failed[_key99] = _val100
+            self.failed[_key106] = _val107
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.MAP:
           self.process_ms_avg = {}
-          (_ktype109, _vtype110, _size108 ) = iprot.readMapBegin() 
-          for _i112 in xrange(_size108):
-            _key113 = iprot.readString().decode('utf-8')
-            _val114 = {}
-            (_ktype116, _vtype117, _size115 ) = iprot.readMapBegin() 
-            for _i119 in xrange(_size115):
-              _key120 = GlobalStreamId()
-              _key120.read(iprot)
-              _val121 = iprot.readDouble();
-              _val114[_key120] = _val121
+          (_ktype116, _vtype117, _size115 ) = iprot.readMapBegin() 
+          for _i119 in xrange(_size115):
+            _key120 = iprot.readString().decode('utf-8')
+            _val121 = {}
+            (_ktype123, _vtype124, _size122 ) = iprot.readMapBegin() 
+            for _i126 in xrange(_size122):
+              _key127 = GlobalStreamId()
+              _key127.read(iprot)
+              _val128 = iprot.readDouble();
+              _val121[_key127] = _val128
             iprot.readMapEnd()
-            self.process_ms_avg[_key113] = _val114
+            self.process_ms_avg[_key120] = _val121
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 4:
         if ftype == TType.MAP:
           self.executed = {}
-          (_ktype123, _vtype124, _size122 ) = iprot.readMapBegin() 
-          for _i126 in xrange(_size122):
-            _key127 = iprot.readString().decode('utf-8')
-            _val128 = {}
-            (_ktype130, _vtype131, _size129 ) = iprot.readMapBegin() 
-            for _i133 in xrange(_size129):
-              _key134 = GlobalStreamId()
-              _key134.read(iprot)
-              _val135 = iprot.readI64();
-              _val128[_key134] = _val135
+          (_ktype130, _vtype131, _size129 ) = iprot.readMapBegin() 
+          for _i133 in xrange(_size129):
+            _key134 = iprot.readString().decode('utf-8')
+            _val135 = {}
+            (_ktype137, _vtype138, _size136 ) = iprot.readMapBegin() 
+            for _i140 in xrange(_size136):
+              _key141 = GlobalStreamId()
+              _key141.read(iprot)
+              _val142 = iprot.readI64();
+              _val135[_key141] = _val142
             iprot.readMapEnd()
-            self.executed[_key127] = _val128
+            self.executed[_key134] = _val135
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 5:
         if ftype == TType.MAP:
           self.execute_ms_avg = {}
-          (_ktype137, _vtype138, _size136 ) = iprot.readMapBegin() 
-          for _i140 in xrange(_size136):
-            _key141 = iprot.readString().decode('utf-8')
-            _val142 = {}
-            (_ktype144, _vtype145, _size143 ) = iprot.readMapBegin() 
-            for _i147 in xrange(_size143):
-              _key148 = GlobalStreamId()
-              _key148.read(iprot)
-              _val149 = iprot.readDouble();
-              _val142[_key148] = _val149
+          (_ktype144, _vtype145, _size143 ) = iprot.readMapBegin() 
+          for _i147 in xrange(_size143):
+            _key148 = iprot.readString().decode('utf-8')
+            _val149 = {}
+            (_ktype151, _vtype152, _size150 ) = iprot.readMapBegin() 
+            for _i154 in xrange(_size150):
+              _key155 = GlobalStreamId()
+              _key155.read(iprot)
+              _val156 = iprot.readDouble();
+              _val149[_key155] = _val156
             iprot.readMapEnd()
-            self.execute_ms_avg[_key141] = _val142
+            self.execute_ms_avg[_key148] = _val149
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -2709,60 +2840,60 @@ class BoltStats:
     if self.acked is not None:
       oprot.writeFieldBegin('acked', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.acked))
-      for kiter150,viter151 in self.acked.items():
-        oprot.writeString(kiter150.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter151))
-        for kiter152,viter153 in viter151.items():
-          kiter152.write(oprot)
-          oprot.writeI64(viter153)
+      for kiter157,viter158 in self.acked.items():
+        oprot.writeString(kiter157.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter158))
+        for kiter159,viter160 in viter158.items():
+          kiter159.write(oprot)
+          oprot.writeI64(viter160)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.failed is not None:
       oprot.writeFieldBegin('failed', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.failed))
-      for kiter154,viter155 in self.failed.items():
-        oprot.writeString(kiter154.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter155))
-        for kiter156,viter157 in viter155.items():
-          kiter156.write(oprot)
-          oprot.writeI64(viter157)
+      for kiter161,viter162 in self.failed.items():
+        oprot.writeString(kiter161.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter162))
+        for kiter163,viter164 in viter162.items():
+          kiter163.write(oprot)
+          oprot.writeI64(viter164)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.process_ms_avg is not None:
       oprot.writeFieldBegin('process_ms_avg', TType.MAP, 3)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.process_ms_avg))
-      for kiter158,viter159 in self.process_ms_avg.items():
-        oprot.writeString(kiter158.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter159))
-        for kiter160,viter161 in viter159.items():
-          kiter160.write(oprot)
-          oprot.writeDouble(viter161)
+      for kiter165,viter166 in self.process_ms_avg.items():
+        oprot.writeString(kiter165.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter166))
+        for kiter167,viter168 in viter166.items():
+          kiter167.write(oprot)
+          oprot.writeDouble(viter168)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.executed is not None:
       oprot.writeFieldBegin('executed', TType.MAP, 4)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.executed))
-      for kiter162,viter163 in self.executed.items():
-        oprot.writeString(kiter162.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter163))
-        for kiter164,viter165 in viter163.items():
-          kiter164.write(oprot)
-          oprot.writeI64(viter165)
+      for kiter169,viter170 in self.executed.items():
+        oprot.writeString(kiter169.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter170))
+        for kiter171,viter172 in viter170.items():
+          kiter171.write(oprot)
+          oprot.writeI64(viter172)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.execute_ms_avg is not None:
       oprot.writeFieldBegin('execute_ms_avg', TType.MAP, 5)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.execute_ms_avg))
-      for kiter166,viter167 in self.execute_ms_avg.items():
-        oprot.writeString(kiter166.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter167))
-        for kiter168,viter169 in viter167.items():
-          kiter168.write(oprot)
-          oprot.writeDouble(viter169)
+      for kiter173,viter174 in self.execute_ms_avg.items():
+        oprot.writeString(kiter173.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter174))
+        for kiter175,viter176 in viter174.items():
+          kiter175.write(oprot)
+          oprot.writeDouble(viter176)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
@@ -2829,51 +2960,51 @@ class SpoutStats:
       if fid == 1:
         if ftype == TType.MAP:
           self.acked = {}
-          (_ktype171, _vtype172, _size170 ) = iprot.readMapBegin() 
-          for _i174 in xrange(_size170):
-            _key175 = iprot.readString().decode('utf-8')
-            _val176 = {}
-            (_ktype178, _vtype179, _size177 ) = iprot.readMapBegin() 
-            for _i181 in xrange(_size177):
-              _key182 = iprot.readString().decode('utf-8')
-              _val183 = iprot.readI64();
-              _val176[_key182] = _val183
+          (_ktype178, _vtype179, _size177 ) = iprot.readMapBegin() 
+          for _i181 in xrange(_size177):
+            _key182 = iprot.readString().decode('utf-8')
+            _val183 = {}
+            (_ktype185, _vtype186, _size184 ) = iprot.readMapBegin() 
+            for _i188 in xrange(_size184):
+              _key189 = iprot.readString().decode('utf-8')
+              _val190 = iprot.readI64();
+              _val183[_key189] = _val190
             iprot.readMapEnd()
-            self.acked[_key175] = _val176
+            self.acked[_key182] = _val183
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.MAP:
           self.failed = {}
-          (_ktype185, _vtype186, _size184 ) = iprot.readMapBegin() 
-          for _i188 in xrange(_size184):
-            _key189 = iprot.readString().decode('utf-8')
-            _val190 = {}
-            (_ktype192, _vtype193, _size191 ) = iprot.readMapBegin() 
-            for _i195 in xrange(_size191):
-              _key196 = iprot.readString().decode('utf-8')
-              _val197 = iprot.readI64();
-              _val190[_key196] = _val197
+          (_ktype192, _vtype193, _size191 ) = iprot.readMapBegin() 
+          for _i195 in xrange(_size191):
+            _key196 = iprot.readString().decode('utf-8')
+            _val197 = {}
+            (_ktype199, _vtype200, _size198 ) = iprot.readMapBegin() 
+            for _i202 in xrange(_size198):
+              _key203 = iprot.readString().decode('utf-8')
+              _val204 = iprot.readI64();
+              _val197[_key203] = _val204
             iprot.readMapEnd()
-            self.failed[_key189] = _val190
+            self.failed[_key196] = _val197
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 3:
         if ftype == TType.MAP:
           self.complete_ms_avg = {}
-          (_ktype199, _vtype200, _size198 ) = iprot.readMapBegin() 
-          for _i202 in xrange(_size198):
-            _key203 = iprot.readString().decode('utf-8')
-            _val204 = {}
-            (_ktype206, _vtype207, _size205 ) = iprot.readMapBegin() 
-            for _i209 in xrange(_size205):
-              _key210 = iprot.readString().decode('utf-8')
-              _val211 = iprot.readDouble();
-              _val204[_key210] = _val211
+          (_ktype206, _vtype207, _size205 ) = iprot.readMapBegin() 
+          for _i209 in xrange(_size205):
+            _key210 = iprot.readString().decode('utf-8')
+            _val211 = {}
+            (_ktype213, _vtype214, _size212 ) = iprot.readMapBegin() 
+            for _i216 in xrange(_size212):
+              _key217 = iprot.readString().decode('utf-8')
+              _val218 = iprot.readDouble();
+              _val211[_key217] = _val218
             iprot.readMapEnd()
-            self.complete_ms_avg[_key203] = _val204
+            self.complete_ms_avg[_key210] = _val211
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -2890,36 +3021,36 @@ class SpoutStats:
     if self.acked is not None:
       oprot.writeFieldBegin('acked', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.acked))
-      for kiter212,viter213 in self.acked.items():
-        oprot.writeString(kiter212.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter213))
-        for kiter214,viter215 in viter213.items():
-          oprot.writeString(kiter214.encode('utf-8'))
-          oprot.writeI64(viter215)
+      for kiter219,viter220 in self.acked.items():
+        oprot.writeString(kiter219.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter220))
+        for kiter221,viter222 in viter220.items():
+          oprot.writeString(kiter221.encode('utf-8'))
+          oprot.writeI64(viter222)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.failed is not None:
       oprot.writeFieldBegin('failed', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.failed))
-      for kiter216,viter217 in self.failed.items():
-        oprot.writeString(kiter216.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter217))
-        for kiter218,viter219 in viter217.items():
-          oprot.writeString(kiter218.encode('utf-8'))
-          oprot.writeI64(viter219)
+      for kiter223,viter224 in self.failed.items():
+        oprot.writeString(kiter223.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter224))
+        for kiter225,viter226 in viter224.items():
+          oprot.writeString(kiter225.encode('utf-8'))
+          oprot.writeI64(viter226)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.complete_ms_avg is not None:
       oprot.writeFieldBegin('complete_ms_avg', TType.MAP, 3)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.complete_ms_avg))
-      for kiter220,viter221 in self.complete_ms_avg.items():
-        oprot.writeString(kiter220.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(viter221))
-        for kiter222,viter223 in viter221.items():
-          oprot.writeString(kiter222.encode('utf-8'))
-          oprot.writeDouble(viter223)
+      for kiter227,viter228 in self.complete_ms_avg.items():
+        oprot.writeString(kiter227.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(viter228))
+        for kiter229,viter230 in viter228.items():
+          oprot.writeString(kiter229.encode('utf-8'))
+          oprot.writeDouble(viter230)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
@@ -3059,34 +3190,34 @@ class ExecutorStats:
       if fid == 1:
         if ftype == TType.MAP:
           self.emitted = {}
-          (_ktype225, _vtype226, _size224 ) = iprot.readMapBegin() 
-          for _i228 in xrange(_size224):
-            _key229 = iprot.readString().decode('utf-8')
-            _val230 = {}
-            (_ktype232, _vtype233, _size231 ) = iprot.readMapBegin() 
-            for _i235 in xrange(_size231):
-              _key236 = iprot.readString().decode('utf-8')
-              _val237 = iprot.readI64();
-              _val230[_key236] = _val237
+          (_ktype232, _vtype233, _size231 ) = iprot.readMapBegin() 
+          for _i235 in xrange(_size231):
+            _key236 = iprot.readString().decode('utf-8')
+            _val237 = {}
+            (_ktype239, _vtype240, _size238 ) = iprot.readMapBegin() 
+            for _i242 in xrange(_size238):
+              _key243 = iprot.readString().decode('utf-8')
+              _val244 = iprot.readI64();
+              _val237[_key243] = _val244
             iprot.readMapEnd()
-            self.emitted[_key229] = _val230
+            self.emitted[_key236] = _val237
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
       elif fid == 2:
         if ftype == TType.MAP:
           self.transferred = {}
-          (_ktype239, _vtype240, _size238 ) = iprot.readMapBegin() 
-          for _i242 in xrange(_size238):
-            _key243 = iprot.readString().decode('utf-8')
-            _val244 = {}
-            (_ktype246, _vtype247, _size245 ) = iprot.readMapBegin() 
-            for _i249 in xrange(_size245):
-              _key250 = iprot.readString().decode('utf-8')
-              _val251 = iprot.readI64();
-              _val244[_key250] = _val251
+          (_ktype246, _vtype247, _size245 ) = iprot.readMapBegin() 
+          for _i249 in xrange(_size245):
+            _key250 = iprot.readString().decode('utf-8')
+            _val251 = {}
+            (_ktype253, _vtype254, _size252 ) = iprot.readMapBegin() 
+            for _i256 in xrange(_size252):
+              _key257 = iprot.readString().decode('utf-8')
+              _val258 = iprot.readI64();
+              _val251[_key257] = _val258
             iprot.readMapEnd()
-            self.transferred[_key243] = _val244
+            self.transferred[_key250] = _val251
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -3109,24 +3240,24 @@ class ExecutorStats:
     if self.emitted is not None:
       oprot.writeFieldBegin('emitted', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.emitted))
-      for kiter252,viter253 in self.emitted.items():
-        oprot.writeString(kiter252.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter253))
-        for kiter254,viter255 in viter253.items():
-          oprot.writeString(kiter254.encode('utf-8'))
-          oprot.writeI64(viter255)
+      for kiter259,viter260 in self.emitted.items():
+        oprot.writeString(kiter259.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter260))
+        for kiter261,viter262 in viter260.items():
+          oprot.writeString(kiter261.encode('utf-8'))
+          oprot.writeI64(viter262)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     if self.transferred is not None:
       oprot.writeFieldBegin('transferred', TType.MAP, 2)
       oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.transferred))
-      for kiter256,viter257 in self.transferred.items():
-        oprot.writeString(kiter256.encode('utf-8'))
-        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter257))
-        for kiter258,viter259 in viter257.items():
-          oprot.writeString(kiter258.encode('utf-8'))
-          oprot.writeI64(viter259)
+      for kiter263,viter264 in self.transferred.items():
+        oprot.writeString(kiter263.encode('utf-8'))
+        oprot.writeMapBegin(TType.STRING, TType.I64, len(viter264))
+        for kiter265,viter266 in viter264.items():
+          oprot.writeString(kiter265.encode('utf-8'))
+          oprot.writeI64(viter266)
         oprot.writeMapEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
@@ -3947,11 +4078,11 @@ class TopologyInfo:
       elif fid == 4:
         if ftype == TType.LIST:
           self.executors = []
-          (_etype263, _size260) = iprot.readListBegin()
-          for _i264 in xrange(_size260):
-            _elem265 = ExecutorSummary()
-            _elem265.read(iprot)
-            self.executors.append(_elem265)
+          (_etype270, _size267) = iprot.readListBegin()
+          for _i271 in xrange(_size267):
+            _elem272 = ExecutorSummary()
+            _elem272.read(iprot)
+            self.executors.append(_elem272)
           iprot.readListEnd()
         else:
           iprot.skip(ftype)
@@ -3963,17 +4094,17 @@ class TopologyInfo:
       elif fid == 6:
         if ftype == TType.MAP:
           self.errors = {}
-          (_ktype267, _vtype268, _size266 ) = iprot.readMapBegin() 
-          for _i270 in xrange(_size266):
-            _key271 = iprot.readString().decode('utf-8')
-            _val272 = []
-            (_etype276, _size273) = iprot.readListBegin()
-            for _i277 in xrange(_size273):
-              _elem278 = ErrorInfo()
-              _elem278.read(iprot)
-              _val272.append(_elem278)
+          (_ktype274, _vtype275, _size273 ) = iprot.readMapBegin() 
+          for _i277 in xrange(_size273):
+            _key278 = iprot.readString().decode('utf-8')
+            _val279 = []
+            (_etype283, _size280) = iprot.readListBegin()
+            for _i284 in xrange(_size280):
+              _elem285 = ErrorInfo()
+              _elem285.read(iprot)
+              _val279.append(_elem285)
             iprot.readListEnd()
-            self.errors[_key271] = _val272
+            self.errors[_key278] = _val279
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -4017,8 +4148,8 @@ class TopologyInfo:
     if self.executors is not None:
       oprot.writeFieldBegin('executors', TType.LIST, 4)
       oprot.writeListBegin(TType.STRUCT, len(self.executors))
-      for iter279 in self.executors:
-        iter279.write(oprot)
+      for iter286 in self.executors:
+        iter286.write(oprot)
       oprot.writeListEnd()
       oprot.writeFieldEnd()
     if self.status is not None:
@@ -4028,11 +4159,11 @@ class TopologyInfo:
     if self.errors is not None:
       oprot.writeFieldBegin('errors', TType.MAP, 6)
       oprot.writeMapBegin(TType.STRING, TType.LIST, len(self.errors))
-      for kiter280,viter281 in self.errors.items():
-        oprot.writeString(kiter280.encode('utf-8'))
-        oprot.writeListBegin(TType.STRUCT, len(viter281))
-        for iter282 in viter281:
-          iter282.write(oprot)
+      for kiter287,viter288 in self.errors.items():
+        oprot.writeString(kiter287.encode('utf-8'))
+        oprot.writeListBegin(TType.STRUCT, len(viter288))
+        for iter289 in viter288:
+          iter289.write(oprot)
         oprot.writeListEnd()
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
@@ -4186,11 +4317,11 @@ class RebalanceOptions:
       elif fid == 3:
         if ftype == TType.MAP:
           self.num_executors = {}
-          (_ktype284, _vtype285, _size283 ) = iprot.readMapBegin() 
-          for _i287 in xrange(_size283):
-            _key288 = iprot.readString().decode('utf-8')
-            _val289 = iprot.readI32();
-            self.num_executors[_key288] = _val289
+          (_ktype291, _vtype292, _size290 ) = iprot.readMapBegin() 
+          for _i294 in xrange(_size290):
+            _key295 = iprot.readString().decode('utf-8')
+            _val296 = iprot.readI32();
+            self.num_executors[_key295] = _val296
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -4215,9 +4346,9 @@ class RebalanceOptions:
     if self.num_executors is not None:
       oprot.writeFieldBegin('num_executors', TType.MAP, 3)
       oprot.writeMapBegin(TType.STRING, TType.I32, len(self.num_executors))
-      for kiter290,viter291 in self.num_executors.items():
-        oprot.writeString(kiter290.encode('utf-8'))
-        oprot.writeI32(viter291)
+      for kiter297,viter298 in self.num_executors.items():
+        oprot.writeString(kiter297.encode('utf-8'))
+        oprot.writeI32(viter298)
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()
@@ -4267,11 +4398,11 @@ class Credentials:
       if fid == 1:
         if ftype == TType.MAP:
           self.creds = {}
-          (_ktype293, _vtype294, _size292 ) = iprot.readMapBegin() 
-          for _i296 in xrange(_size292):
-            _key297 = iprot.readString().decode('utf-8')
-            _val298 = iprot.readString().decode('utf-8')
-            self.creds[_key297] = _val298
+          (_ktype300, _vtype301, _size299 ) = iprot.readMapBegin() 
+          for _i303 in xrange(_size299):
+            _key304 = iprot.readString().decode('utf-8')
+            _val305 = iprot.readString().decode('utf-8')
+            self.creds[_key304] = _val305
           iprot.readMapEnd()
         else:
           iprot.skip(ftype)
@@ -4288,9 +4419,9 @@ class Credentials:
     if self.creds is not None:
       oprot.writeFieldBegin('creds', TType.MAP, 1)
       oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.creds))
-      for kiter299,viter300 in self.creds.items():
-        oprot.writeString(kiter299.encode('utf-8'))
-        oprot.writeString(viter300.encode('utf-8'))
+      for kiter306,viter307 in self.creds.items():
+        oprot.writeString(kiter306.encode('utf-8'))
+        oprot.writeString(viter307.encode('utf-8'))
       oprot.writeMapEnd()
       oprot.writeFieldEnd()
     oprot.writeFieldStop()