You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2015/03/20 22:01:40 UTC
[35/50] [abbrv] storm git commit: Adding nimbus summary info to
zookeeper.
Adding nimbus summary info to zookeeper.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4502bffb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4502bffb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4502bffb
Branch: refs/heads/0.11.x-branch
Commit: 4502bffbe3f9b4cd3674a56afbda1bb115cec239
Parents: 1b6491f
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Feb 12 11:27:50 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Thu Feb 12 11:27:50 2015 -0800
----------------------------------------------------------------------
storm-core/src/clj/backtype/storm/cluster.clj | 24 +-
storm-core/src/clj/backtype/storm/config.clj | 10 +
.../src/clj/backtype/storm/daemon/nimbus.clj | 27 +-
storm-core/src/clj/backtype/storm/ui/core.clj | 28 +-
.../storm/generated/ClusterSummary.java | 232 +++---
.../backtype/storm/generated/NimbusSummary.java | 723 +++++++++++++++++++
storm-core/src/py/storm/ttypes.py | 577 +++++++++------
storm-core/src/storm.thrift | 10 +-
.../public/templates/index-page-template.html | 26 +-
9 files changed, 1297 insertions(+), 360 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 2c58510..3bf6628 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -146,6 +146,12 @@
(code-distributor [this callback])
;returns lits of nimbusinfos under /stormroot/code-distributor/storm-id
(code-distributor-info [this storm-id])
+
+ ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data>
+ (nimbuses [this])
+ ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
+ (add-nimbus-host! [this nimbus-id nimbus-summary])
+
(active-storms [this])
(storm-base [this storm-id callback])
(get-worker-heartbeat [this storm-id node port])
@@ -180,14 +186,17 @@
(def WORKERBEATS-ROOT "workerbeats")
(def ERRORS-ROOT "errors")
(def CODE-DISTRIBUTOR-ROOT "code-distributor")
+(def NIMBUSES-ROOT "nimbuses")
(def CREDENTIALS-ROOT "credentials")
+
(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
(def STORMS-SUBTREE (str "/" STORMS-ROOT))
(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
(def CODE-DISTRIBUTOR-SUBTREE (str "/" CODE-DISTRIBUTOR-ROOT))
+(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
(defn supervisor-path
@@ -202,6 +211,10 @@
[id]
(str CODE-DISTRIBUTOR-SUBTREE "/" id))
+(defn nimbus-path
+ [id]
+ (str NIMBUSES-SUBTREE "/" id))
+
(defn storm-path
[id]
(str STORMS-SUBTREE "/" id))
@@ -292,7 +305,7 @@
CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args))
;; this should never happen
(exit-process! 30 "Unknown callback for subtree " subtree args)))))]
- (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE]]
+ (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE CODE-DISTRIBUTOR-SUBTREE NIMBUSES-SUBTREE]]
(mkdirs cluster-state p acls))
(reify
StormClusterState
@@ -330,6 +343,15 @@
(reset! code-distributor-callback callback))
(get-children cluster-state CODE-DISTRIBUTOR-SUBTREE (not-nil? callback)))
+ (nimbuses
+ [this]
+ (map #(maybe-deserialize (get-data cluster-state (nimbus-path %1) false))
+ (get-children cluster-state NIMBUSES-SUBTREE false)))
+
+ (add-nimbus-host!
+ [this nimbus-id nimbus-summary]
+ (set-ephemeral-node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls))
+
(code-distributor-info
[this storm-id]
(map (fn [nimbus-info] (NimbusInfo/parse nimbus-info)) (get-children cluster-state (code-distributor-path storm-id) false)))
http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index a6b160d..f3c70e5 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -282,3 +282,13 @@
(defn ^LocalState worker-state
[conf id]
(LocalState. (worker-heartbeats-root conf id)))
+
+(defn read-storm-version
+ "Returns a string containing the Storm version or 'Unknown'."
+ []
+ (let [storm-home (System/getProperty "storm.home")
+ release-path (format "%s/RELEASE" storm-home)
+ release-file (File. release-path)]
+ (if (and (.exists release-file) (.isFile release-file))
+ (str/trim (slurp release-path))
+ "Unknown")))
http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index e354fab..52ee708 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -15,7 +15,8 @@
;; limitations under the License.
(ns backtype.storm.daemon.nimbus
(:import [java.nio ByteBuffer]
- [java.util Collections])
+ [java.util Collections]
+ [backtype.storm.generated NimbusSummary])
(:import [java.io FileNotFoundException])
(:import [java.net InetAddress])
(:import [java.nio.channels Channels WritableByteChannel])
@@ -104,6 +105,7 @@
:id->sched-status (atom {})
:cred-renewers (AuthUtils/GetCredentialRenewers conf)
:nimbus-autocred-plugins (AuthUtils/getNimbusAutoCredPlugins conf)
+ :nimbuses-cache (atom {}) ;;TODO need to figure out how to keep the cache upto date, one more thread
}))
(defn inbox [nimbus]
@@ -1030,6 +1032,17 @@
(let [nimbus (nimbus-data conf inimbus)
principal-to-local (AuthUtils/GetPrincipalToLocalPlugin conf)]
(.prepare ^backtype.storm.nimbus.ITopologyValidator (:validator nimbus) conf)
+
+ ;add to nimbuses
+ (.add-nimbus-host! (:storm-cluster-state nimbus)
+ (.toHostPortString (:nimbus-host-port-info nimbus))
+ {
+ :host (.getHost (:nimbus-host-port-info nimbus))
+ :port (.getPort (:nimbus-host-port-info nimbus))
+ :start-time-secs (current-time-secs)
+ :version (read-storm-version)
+ })
+
(.addToLeaderLockQueue (:leader-elector nimbus))
(cleanup-corrupt-topologies! nimbus)
;register call back for code-distributor
@@ -1287,8 +1300,14 @@
(count (:used-ports info))
id )
))
- nimbus-uptime ((:uptime nimbus))
bases (topology-bases storm-cluster-state)
+ nimbuses (.nimbuses storm-cluster-state)
+ nimbuses (map #(NimbusSummary. (:host %1) (:port %1) (time-delta (:start-time-secs %1))
+ (let [leader (.getLeader (:leader-elector nimbus))]
+ (and (= (.getHost leader) (:host %1)) (= (.getPort leader) (:port %1))))
+ (:version %1))
+ nimbuses
+ )
topology-summaries (dofor [[id base] bases :when base]
(let [assignment (.assignment-info storm-cluster-state id nil)
topo-summ (TopologySummary. id
@@ -1312,8 +1331,8 @@
topo-summ
))]
(ClusterSummary. supervisor-summaries
- nimbus-uptime
- topology-summaries)
+ topology-summaries
+ nimbuses)
))
(^TopologyInfo getTopologyInfo [this ^String storm-id]
http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/clj/backtype/storm/ui/core.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj
index 3c7f578..94b0311 100644
--- a/storm-core/src/clj/backtype/storm/ui/core.clj
+++ b/storm-core/src/clj/backtype/storm/ui/core.clj
@@ -78,16 +78,6 @@
(map #(.get_stats ^ExecutorSummary %))
(filter not-nil?)))
-(defn read-storm-version
- "Returns a string containing the Storm version or 'Unknown'."
- []
- (let [storm-home (System/getProperty "storm.home")
- release-path (format "%s/RELEASE" storm-home)
- release-file (File. release-path)]
- (if (and (.exists release-file) (.isFile release-file))
- (trim (slurp release-path))
- "Unknown")))
-
(defn component-type
"Returns the component type (either :bolt or :spout) for a given
topology and component id. Returns nil if not found."
@@ -520,7 +510,6 @@
(reduce +))]
{"user" user
"stormVersion" (read-storm-version)
- "nimbusUptime" (pretty-uptime-sec (.get_nimbus_uptime_secs summ))
"supervisors" (count sups)
"slotsTotal" total-slots
"slotsUsed" used-slots
@@ -530,18 +519,19 @@
(defn nimbus-summary
([]
- (let [leader-elector (zk-leader-elector *STORM-CONF*)
- nimbus-hosts (.getAllNimbuses leader-elector)
- no-op (.close leader-elector)]
- (nimbus-summary nimbus-hosts)))
+ (with-nimbus nimbus
+ (nimbus-summary
+ (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus)))))
([nimbuses]
{"nimbuses"
(for [^NimbusInfo n nimbuses]
{
- "host" (.getHost n)
- "port" (.getPort n)
- "nimbusLogLink" (nimbus-log-link (.getHost n) (.getPort n))
- "isLeader" (.isLeader n)})}))
+ "host" (.get_host n)
+ "port" (.get_port n)
+ "nimbusLogLink" (nimbus-log-link (.get_host n) (.get_port n))
+ "isLeader" (.is_isLeader n)
+ "version" (.get_version n)
+ "nimbusUpTime" (pretty-uptime-sec (.get_uptimeSecs n))})}))
(defn supervisor-summary
([]
http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
index a2623ab..7e32c72 100644
--- a/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
+++ b/storm-core/src/jvm/backtype/storm/generated/ClusterSummary.java
@@ -42,18 +42,18 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("ClusterSummary");
private static final org.apache.thrift.protocol.TField SUPERVISORS_FIELD_DESC = new org.apache.thrift.protocol.TField("supervisors", org.apache.thrift.protocol.TType.LIST, (short)1);
- private static final org.apache.thrift.protocol.TField NIMBUS_UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("nimbus_uptime_secs", org.apache.thrift.protocol.TType.I32, (short)2);
private static final org.apache.thrift.protocol.TField TOPOLOGIES_FIELD_DESC = new org.apache.thrift.protocol.TField("topologies", org.apache.thrift.protocol.TType.LIST, (short)3);
+ private static final org.apache.thrift.protocol.TField NIMBUSES_FIELD_DESC = new org.apache.thrift.protocol.TField("nimbuses", org.apache.thrift.protocol.TType.LIST, (short)4);
private List<SupervisorSummary> supervisors; // required
- private int nimbus_uptime_secs; // required
private List<TopologySummary> topologies; // required
+ private List<NimbusSummary> nimbuses; // required
/** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
public enum _Fields implements org.apache.thrift.TFieldIdEnum {
SUPERVISORS((short)1, "supervisors"),
- NIMBUS_UPTIME_SECS((short)2, "nimbus_uptime_secs"),
- TOPOLOGIES((short)3, "topologies");
+ TOPOLOGIES((short)3, "topologies"),
+ NIMBUSES((short)4, "nimbuses");
private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
@@ -70,10 +70,10 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
switch(fieldId) {
case 1: // SUPERVISORS
return SUPERVISORS;
- case 2: // NIMBUS_UPTIME_SECS
- return NIMBUS_UPTIME_SECS;
case 3: // TOPOLOGIES
return TOPOLOGIES;
+ case 4: // NIMBUSES
+ return NIMBUSES;
default:
return null;
}
@@ -114,8 +114,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
}
// isset id assignments
- private static final int __NIMBUS_UPTIME_SECS_ISSET_ID = 0;
- private BitSet __isset_bit_vector = new BitSet(1);
public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
static {
@@ -123,11 +121,12 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
tmpMap.put(_Fields.SUPERVISORS, new org.apache.thrift.meta_data.FieldMetaData("supervisors", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, SupervisorSummary.class))));
- tmpMap.put(_Fields.NIMBUS_UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("nimbus_uptime_secs", org.apache.thrift.TFieldRequirementType.REQUIRED,
- new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
tmpMap.put(_Fields.TOPOLOGIES, new org.apache.thrift.meta_data.FieldMetaData("topologies", org.apache.thrift.TFieldRequirementType.REQUIRED,
new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, TopologySummary.class))));
+ tmpMap.put(_Fields.NIMBUSES, new org.apache.thrift.meta_data.FieldMetaData("nimbuses", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NimbusSummary.class))));
metaDataMap = Collections.unmodifiableMap(tmpMap);
org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(ClusterSummary.class, metaDataMap);
}
@@ -137,22 +136,19 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
public ClusterSummary(
List<SupervisorSummary> supervisors,
- int nimbus_uptime_secs,
- List<TopologySummary> topologies)
+ List<TopologySummary> topologies,
+ List<NimbusSummary> nimbuses)
{
this();
this.supervisors = supervisors;
- this.nimbus_uptime_secs = nimbus_uptime_secs;
- set_nimbus_uptime_secs_isSet(true);
this.topologies = topologies;
+ this.nimbuses = nimbuses;
}
/**
* Performs a deep copy on <i>other</i>.
*/
public ClusterSummary(ClusterSummary other) {
- __isset_bit_vector.clear();
- __isset_bit_vector.or(other.__isset_bit_vector);
if (other.is_set_supervisors()) {
List<SupervisorSummary> __this__supervisors = new ArrayList<SupervisorSummary>();
for (SupervisorSummary other_element : other.supervisors) {
@@ -160,7 +156,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
}
this.supervisors = __this__supervisors;
}
- this.nimbus_uptime_secs = other.nimbus_uptime_secs;
if (other.is_set_topologies()) {
List<TopologySummary> __this__topologies = new ArrayList<TopologySummary>();
for (TopologySummary other_element : other.topologies) {
@@ -168,6 +163,13 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
}
this.topologies = __this__topologies;
}
+ if (other.is_set_nimbuses()) {
+ List<NimbusSummary> __this__nimbuses = new ArrayList<NimbusSummary>();
+ for (NimbusSummary other_element : other.nimbuses) {
+ __this__nimbuses.add(new NimbusSummary(other_element));
+ }
+ this.nimbuses = __this__nimbuses;
+ }
}
public ClusterSummary deepCopy() {
@@ -177,9 +179,8 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
@Override
public void clear() {
this.supervisors = null;
- set_nimbus_uptime_secs_isSet(false);
- this.nimbus_uptime_secs = 0;
this.topologies = null;
+ this.nimbuses = null;
}
public int get_supervisors_size() {
@@ -220,28 +221,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
}
}
- public int get_nimbus_uptime_secs() {
- return this.nimbus_uptime_secs;
- }
-
- public void set_nimbus_uptime_secs(int nimbus_uptime_secs) {
- this.nimbus_uptime_secs = nimbus_uptime_secs;
- set_nimbus_uptime_secs_isSet(true);
- }
-
- public void unset_nimbus_uptime_secs() {
- __isset_bit_vector.clear(__NIMBUS_UPTIME_SECS_ISSET_ID);
- }
-
- /** Returns true if field nimbus_uptime_secs is set (has been assigned a value) and false otherwise */
- public boolean is_set_nimbus_uptime_secs() {
- return __isset_bit_vector.get(__NIMBUS_UPTIME_SECS_ISSET_ID);
- }
-
- public void set_nimbus_uptime_secs_isSet(boolean value) {
- __isset_bit_vector.set(__NIMBUS_UPTIME_SECS_ISSET_ID, value);
- }
-
public int get_topologies_size() {
return (this.topologies == null) ? 0 : this.topologies.size();
}
@@ -280,6 +259,44 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
}
}
+ public int get_nimbuses_size() {
+ return (this.nimbuses == null) ? 0 : this.nimbuses.size();
+ }
+
+ public java.util.Iterator<NimbusSummary> get_nimbuses_iterator() {
+ return (this.nimbuses == null) ? null : this.nimbuses.iterator();
+ }
+
+ public void add_to_nimbuses(NimbusSummary elem) {
+ if (this.nimbuses == null) {
+ this.nimbuses = new ArrayList<NimbusSummary>();
+ }
+ this.nimbuses.add(elem);
+ }
+
+ public List<NimbusSummary> get_nimbuses() {
+ return this.nimbuses;
+ }
+
+ public void set_nimbuses(List<NimbusSummary> nimbuses) {
+ this.nimbuses = nimbuses;
+ }
+
+ public void unset_nimbuses() {
+ this.nimbuses = null;
+ }
+
+ /** Returns true if field nimbuses is set (has been assigned a value) and false otherwise */
+ public boolean is_set_nimbuses() {
+ return this.nimbuses != null;
+ }
+
+ public void set_nimbuses_isSet(boolean value) {
+ if (!value) {
+ this.nimbuses = null;
+ }
+ }
+
public void setFieldValue(_Fields field, Object value) {
switch (field) {
case SUPERVISORS:
@@ -290,19 +307,19 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
}
break;
- case NIMBUS_UPTIME_SECS:
+ case TOPOLOGIES:
if (value == null) {
- unset_nimbus_uptime_secs();
+ unset_topologies();
} else {
- set_nimbus_uptime_secs((Integer)value);
+ set_topologies((List<TopologySummary>)value);
}
break;
- case TOPOLOGIES:
+ case NIMBUSES:
if (value == null) {
- unset_topologies();
+ unset_nimbuses();
} else {
- set_topologies((List<TopologySummary>)value);
+ set_nimbuses((List<NimbusSummary>)value);
}
break;
@@ -314,12 +331,12 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
case SUPERVISORS:
return get_supervisors();
- case NIMBUS_UPTIME_SECS:
- return Integer.valueOf(get_nimbus_uptime_secs());
-
case TOPOLOGIES:
return get_topologies();
+ case NIMBUSES:
+ return get_nimbuses();
+
}
throw new IllegalStateException();
}
@@ -333,10 +350,10 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
switch (field) {
case SUPERVISORS:
return is_set_supervisors();
- case NIMBUS_UPTIME_SECS:
- return is_set_nimbus_uptime_secs();
case TOPOLOGIES:
return is_set_topologies();
+ case NIMBUSES:
+ return is_set_nimbuses();
}
throw new IllegalStateException();
}
@@ -363,15 +380,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
return false;
}
- boolean this_present_nimbus_uptime_secs = true;
- boolean that_present_nimbus_uptime_secs = true;
- if (this_present_nimbus_uptime_secs || that_present_nimbus_uptime_secs) {
- if (!(this_present_nimbus_uptime_secs && that_present_nimbus_uptime_secs))
- return false;
- if (this.nimbus_uptime_secs != that.nimbus_uptime_secs)
- return false;
- }
-
boolean this_present_topologies = true && this.is_set_topologies();
boolean that_present_topologies = true && that.is_set_topologies();
if (this_present_topologies || that_present_topologies) {
@@ -381,6 +389,15 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
return false;
}
+ boolean this_present_nimbuses = true && this.is_set_nimbuses();
+ boolean that_present_nimbuses = true && that.is_set_nimbuses();
+ if (this_present_nimbuses || that_present_nimbuses) {
+ if (!(this_present_nimbuses && that_present_nimbuses))
+ return false;
+ if (!this.nimbuses.equals(that.nimbuses))
+ return false;
+ }
+
return true;
}
@@ -393,16 +410,16 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
if (present_supervisors)
builder.append(supervisors);
- boolean present_nimbus_uptime_secs = true;
- builder.append(present_nimbus_uptime_secs);
- if (present_nimbus_uptime_secs)
- builder.append(nimbus_uptime_secs);
-
boolean present_topologies = true && (is_set_topologies());
builder.append(present_topologies);
if (present_topologies)
builder.append(topologies);
+ boolean present_nimbuses = true && (is_set_nimbuses());
+ builder.append(present_nimbuses);
+ if (present_nimbuses)
+ builder.append(nimbuses);
+
return builder.toHashCode();
}
@@ -424,22 +441,22 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
return lastComparison;
}
}
- lastComparison = Boolean.valueOf(is_set_nimbus_uptime_secs()).compareTo(typedOther.is_set_nimbus_uptime_secs());
+ lastComparison = Boolean.valueOf(is_set_topologies()).compareTo(typedOther.is_set_topologies());
if (lastComparison != 0) {
return lastComparison;
}
- if (is_set_nimbus_uptime_secs()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nimbus_uptime_secs, typedOther.nimbus_uptime_secs);
+ if (is_set_topologies()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topologies, typedOther.topologies);
if (lastComparison != 0) {
return lastComparison;
}
}
- lastComparison = Boolean.valueOf(is_set_topologies()).compareTo(typedOther.is_set_topologies());
+ lastComparison = Boolean.valueOf(is_set_nimbuses()).compareTo(typedOther.is_set_nimbuses());
if (lastComparison != 0) {
return lastComparison;
}
- if (is_set_topologies()) {
- lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.topologies, typedOther.topologies);
+ if (is_set_nimbuses()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.nimbuses, typedOther.nimbuses);
if (lastComparison != 0) {
return lastComparison;
}
@@ -479,14 +496,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
- case 2: // NIMBUS_UPTIME_SECS
- if (field.type == org.apache.thrift.protocol.TType.I32) {
- this.nimbus_uptime_secs = iprot.readI32();
- set_nimbus_uptime_secs_isSet(true);
- } else {
- org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
- }
- break;
case 3: // TOPOLOGIES
if (field.type == org.apache.thrift.protocol.TType.LIST) {
{
@@ -505,6 +514,24 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
break;
+ case 4: // NIMBUSES
+ if (field.type == org.apache.thrift.protocol.TType.LIST) {
+ {
+ org.apache.thrift.protocol.TList _list43 = iprot.readListBegin();
+ this.nimbuses = new ArrayList<NimbusSummary>(_list43.size);
+ for (int _i44 = 0; _i44 < _list43.size; ++_i44)
+ {
+ NimbusSummary _elem45; // required
+ _elem45 = new NimbusSummary();
+ _elem45.read(iprot);
+ this.nimbuses.add(_elem45);
+ }
+ iprot.readListEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
default:
org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
}
@@ -522,24 +549,33 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
oprot.writeFieldBegin(SUPERVISORS_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.supervisors.size()));
- for (SupervisorSummary _iter43 : this.supervisors)
+ for (SupervisorSummary _iter46 : this.supervisors)
{
- _iter43.write(oprot);
+ _iter46.write(oprot);
}
oprot.writeListEnd();
}
oprot.writeFieldEnd();
}
- oprot.writeFieldBegin(NIMBUS_UPTIME_SECS_FIELD_DESC);
- oprot.writeI32(this.nimbus_uptime_secs);
- oprot.writeFieldEnd();
if (this.topologies != null) {
oprot.writeFieldBegin(TOPOLOGIES_FIELD_DESC);
{
oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.topologies.size()));
- for (TopologySummary _iter44 : this.topologies)
+ for (TopologySummary _iter47 : this.topologies)
+ {
+ _iter47.write(oprot);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ if (this.nimbuses != null) {
+ oprot.writeFieldBegin(NIMBUSES_FIELD_DESC);
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRUCT, this.nimbuses.size()));
+ for (NimbusSummary _iter48 : this.nimbuses)
{
- _iter44.write(oprot);
+ _iter48.write(oprot);
}
oprot.writeListEnd();
}
@@ -562,10 +598,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
}
first = false;
if (!first) sb.append(", ");
- sb.append("nimbus_uptime_secs:");
- sb.append(this.nimbus_uptime_secs);
- first = false;
- if (!first) sb.append(", ");
sb.append("topologies:");
if (this.topologies == null) {
sb.append("null");
@@ -573,6 +605,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
sb.append(this.topologies);
}
first = false;
+ if (!first) sb.append(", ");
+ sb.append("nimbuses:");
+ if (this.nimbuses == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.nimbuses);
+ }
+ first = false;
sb.append(")");
return sb.toString();
}
@@ -583,14 +623,14 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
throw new org.apache.thrift.protocol.TProtocolException("Required field 'supervisors' is unset! Struct:" + toString());
}
- if (!is_set_nimbus_uptime_secs()) {
- throw new org.apache.thrift.protocol.TProtocolException("Required field 'nimbus_uptime_secs' is unset! Struct:" + toString());
- }
-
if (!is_set_topologies()) {
throw new org.apache.thrift.protocol.TProtocolException("Required field 'topologies' is unset! Struct:" + toString());
}
+ if (!is_set_nimbuses()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'nimbuses' is unset! Struct:" + toString());
+ }
+
}
private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
@@ -603,8 +643,6 @@ public class ClusterSummary implements org.apache.thrift.TBase<ClusterSummary, C
private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
try {
- // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
- __isset_bit_vector = new BitSet(1);
read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
} catch (org.apache.thrift.TException te) {
throw new java.io.IOException(te);
http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java b/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java
new file mode 100644
index 0000000..195048a
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/NimbusSummary.java
@@ -0,0 +1,723 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NimbusSummary implements org.apache.thrift.TBase<NimbusSummary, NimbusSummary._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("NimbusSummary");
+
+ private static final org.apache.thrift.protocol.TField HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("host", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("port", org.apache.thrift.protocol.TType.I32, (short)2);
+ private static final org.apache.thrift.protocol.TField UPTIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("uptimeSecs", org.apache.thrift.protocol.TType.I32, (short)3);
+ private static final org.apache.thrift.protocol.TField IS_LEADER_FIELD_DESC = new org.apache.thrift.protocol.TField("isLeader", org.apache.thrift.protocol.TType.BOOL, (short)4);
+ private static final org.apache.thrift.protocol.TField VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("version", org.apache.thrift.protocol.TType.STRING, (short)5);
+
+ private String host; // required
+ private int port; // required
+ private int uptimeSecs; // required
+ private boolean isLeader; // required
+ private String version; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ HOST((short)1, "host"),
+ PORT((short)2, "port"),
+ UPTIME_SECS((short)3, "uptimeSecs"),
+ IS_LEADER((short)4, "isLeader"),
+ VERSION((short)5, "version");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // HOST
+ return HOST;
+ case 2: // PORT
+ return PORT;
+ case 3: // UPTIME_SECS
+ return UPTIME_SECS;
+ case 4: // IS_LEADER
+ return IS_LEADER;
+ case 5: // VERSION
+ return VERSION;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+ private static final int __PORT_ISSET_ID = 0;
+ private static final int __UPTIMESECS_ISSET_ID = 1;
+ private static final int __ISLEADER_ISSET_ID = 2;
+ private BitSet __isset_bit_vector = new BitSet(3);
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.HOST, new org.apache.thrift.meta_data.FieldMetaData("host", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.PORT, new org.apache.thrift.meta_data.FieldMetaData("port", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ tmpMap.put(_Fields.UPTIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("uptimeSecs", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32)));
+ tmpMap.put(_Fields.IS_LEADER, new org.apache.thrift.meta_data.FieldMetaData("isLeader", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
+ tmpMap.put(_Fields.VERSION, new org.apache.thrift.meta_data.FieldMetaData("version", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(NimbusSummary.class, metaDataMap);
+ }
+
+ public NimbusSummary() {
+ }
+
+ public NimbusSummary(
+ String host,
+ int port,
+ int uptimeSecs,
+ boolean isLeader,
+ String version)
+ {
+ this();
+ this.host = host;
+ this.port = port;
+ set_port_isSet(true);
+ this.uptimeSecs = uptimeSecs;
+ set_uptimeSecs_isSet(true);
+ this.isLeader = isLeader;
+ set_isLeader_isSet(true);
+ this.version = version;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public NimbusSummary(NimbusSummary other) {
+ __isset_bit_vector.clear();
+ __isset_bit_vector.or(other.__isset_bit_vector);
+ if (other.is_set_host()) {
+ this.host = other.host;
+ }
+ this.port = other.port;
+ this.uptimeSecs = other.uptimeSecs;
+ this.isLeader = other.isLeader;
+ if (other.is_set_version()) {
+ this.version = other.version;
+ }
+ }
+
+ public NimbusSummary deepCopy() {
+ return new NimbusSummary(this);
+ }
+
+ @Override
+ public void clear() {
+ this.host = null;
+ set_port_isSet(false);
+ this.port = 0;
+ set_uptimeSecs_isSet(false);
+ this.uptimeSecs = 0;
+ set_isLeader_isSet(false);
+ this.isLeader = false;
+ this.version = null;
+ }
+
+ public String get_host() {
+ return this.host;
+ }
+
+ public void set_host(String host) {
+ this.host = host;
+ }
+
+ public void unset_host() {
+ this.host = null;
+ }
+
+ /** Returns true if field host is set (has been assigned a value) and false otherwise */
+ public boolean is_set_host() {
+ return this.host != null;
+ }
+
+ public void set_host_isSet(boolean value) {
+ if (!value) {
+ this.host = null;
+ }
+ }
+
+ public int get_port() {
+ return this.port;
+ }
+
+ public void set_port(int port) {
+ this.port = port;
+ set_port_isSet(true);
+ }
+
+ public void unset_port() {
+ __isset_bit_vector.clear(__PORT_ISSET_ID);
+ }
+
+ /** Returns true if field port is set (has been assigned a value) and false otherwise */
+ public boolean is_set_port() {
+ return __isset_bit_vector.get(__PORT_ISSET_ID);
+ }
+
+ public void set_port_isSet(boolean value) {
+ __isset_bit_vector.set(__PORT_ISSET_ID, value);
+ }
+
+ public int get_uptimeSecs() {
+ return this.uptimeSecs;
+ }
+
+ public void set_uptimeSecs(int uptimeSecs) {
+ this.uptimeSecs = uptimeSecs;
+ set_uptimeSecs_isSet(true);
+ }
+
+ public void unset_uptimeSecs() {
+ __isset_bit_vector.clear(__UPTIMESECS_ISSET_ID);
+ }
+
+ /** Returns true if field uptimeSecs is set (has been assigned a value) and false otherwise */
+ public boolean is_set_uptimeSecs() {
+ return __isset_bit_vector.get(__UPTIMESECS_ISSET_ID);
+ }
+
+ public void set_uptimeSecs_isSet(boolean value) {
+ __isset_bit_vector.set(__UPTIMESECS_ISSET_ID, value);
+ }
+
+ public boolean is_isLeader() {
+ return this.isLeader;
+ }
+
+ public void set_isLeader(boolean isLeader) {
+ this.isLeader = isLeader;
+ set_isLeader_isSet(true);
+ }
+
+ public void unset_isLeader() {
+ __isset_bit_vector.clear(__ISLEADER_ISSET_ID);
+ }
+
+ /** Returns true if field isLeader is set (has been assigned a value) and false otherwise */
+ public boolean is_set_isLeader() {
+ return __isset_bit_vector.get(__ISLEADER_ISSET_ID);
+ }
+
+ public void set_isLeader_isSet(boolean value) {
+ __isset_bit_vector.set(__ISLEADER_ISSET_ID, value);
+ }
+
+ public String get_version() {
+ return this.version;
+ }
+
+ public void set_version(String version) {
+ this.version = version;
+ }
+
+ public void unset_version() {
+ this.version = null;
+ }
+
+ /** Returns true if field version is set (has been assigned a value) and false otherwise */
+ public boolean is_set_version() {
+ return this.version != null;
+ }
+
+ public void set_version_isSet(boolean value) {
+ if (!value) {
+ this.version = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case HOST:
+ if (value == null) {
+ unset_host();
+ } else {
+ set_host((String)value);
+ }
+ break;
+
+ case PORT:
+ if (value == null) {
+ unset_port();
+ } else {
+ set_port((Integer)value);
+ }
+ break;
+
+ case UPTIME_SECS:
+ if (value == null) {
+ unset_uptimeSecs();
+ } else {
+ set_uptimeSecs((Integer)value);
+ }
+ break;
+
+ case IS_LEADER:
+ if (value == null) {
+ unset_isLeader();
+ } else {
+ set_isLeader((Boolean)value);
+ }
+ break;
+
+ case VERSION:
+ if (value == null) {
+ unset_version();
+ } else {
+ set_version((String)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case HOST:
+ return get_host();
+
+ case PORT:
+ return Integer.valueOf(get_port());
+
+ case UPTIME_SECS:
+ return Integer.valueOf(get_uptimeSecs());
+
+ case IS_LEADER:
+ return Boolean.valueOf(is_isLeader());
+
+ case VERSION:
+ return get_version();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case HOST:
+ return is_set_host();
+ case PORT:
+ return is_set_port();
+ case UPTIME_SECS:
+ return is_set_uptimeSecs();
+ case IS_LEADER:
+ return is_set_isLeader();
+ case VERSION:
+ return is_set_version();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof NimbusSummary)
+ return this.equals((NimbusSummary)that);
+ return false;
+ }
+
+ public boolean equals(NimbusSummary that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_host = true && this.is_set_host();
+ boolean that_present_host = true && that.is_set_host();
+ if (this_present_host || that_present_host) {
+ if (!(this_present_host && that_present_host))
+ return false;
+ if (!this.host.equals(that.host))
+ return false;
+ }
+
+ boolean this_present_port = true;
+ boolean that_present_port = true;
+ if (this_present_port || that_present_port) {
+ if (!(this_present_port && that_present_port))
+ return false;
+ if (this.port != that.port)
+ return false;
+ }
+
+ boolean this_present_uptimeSecs = true;
+ boolean that_present_uptimeSecs = true;
+ if (this_present_uptimeSecs || that_present_uptimeSecs) {
+ if (!(this_present_uptimeSecs && that_present_uptimeSecs))
+ return false;
+ if (this.uptimeSecs != that.uptimeSecs)
+ return false;
+ }
+
+ boolean this_present_isLeader = true;
+ boolean that_present_isLeader = true;
+ if (this_present_isLeader || that_present_isLeader) {
+ if (!(this_present_isLeader && that_present_isLeader))
+ return false;
+ if (this.isLeader != that.isLeader)
+ return false;
+ }
+
+ boolean this_present_version = true && this.is_set_version();
+ boolean that_present_version = true && that.is_set_version();
+ if (this_present_version || that_present_version) {
+ if (!(this_present_version && that_present_version))
+ return false;
+ if (!this.version.equals(that.version))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_host = true && (is_set_host());
+ builder.append(present_host);
+ if (present_host)
+ builder.append(host);
+
+ boolean present_port = true;
+ builder.append(present_port);
+ if (present_port)
+ builder.append(port);
+
+ boolean present_uptimeSecs = true;
+ builder.append(present_uptimeSecs);
+ if (present_uptimeSecs)
+ builder.append(uptimeSecs);
+
+ boolean present_isLeader = true;
+ builder.append(present_isLeader);
+ if (present_isLeader)
+ builder.append(isLeader);
+
+ boolean present_version = true && (is_set_version());
+ builder.append(present_version);
+ if (present_version)
+ builder.append(version);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(NimbusSummary other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ NimbusSummary typedOther = (NimbusSummary)other;
+
+ lastComparison = Boolean.valueOf(is_set_host()).compareTo(typedOther.is_set_host());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_host()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.host, typedOther.host);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_port()).compareTo(typedOther.is_set_port());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_port()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.port, typedOther.port);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_uptimeSecs()).compareTo(typedOther.is_set_uptimeSecs());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_uptimeSecs()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.uptimeSecs, typedOther.uptimeSecs);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_isLeader()).compareTo(typedOther.is_set_isLeader());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_isLeader()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.isLeader, typedOther.isLeader);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_version()).compareTo(typedOther.is_set_version());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_version()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.version, typedOther.version);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // HOST
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.host = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // PORT
+ if (field.type == org.apache.thrift.protocol.TType.I32) {
+ this.port = iprot.readI32();
+ set_port_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3: // UPTIME_SECS
+ if (field.type == org.apache.thrift.protocol.TType.I32) {
+ this.uptimeSecs = iprot.readI32();
+ set_uptimeSecs_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 4: // IS_LEADER
+ if (field.type == org.apache.thrift.protocol.TType.BOOL) {
+ this.isLeader = iprot.readBool();
+ set_isLeader_isSet(true);
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 5: // VERSION
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.version = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.host != null) {
+ oprot.writeFieldBegin(HOST_FIELD_DESC);
+ oprot.writeString(this.host);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldBegin(PORT_FIELD_DESC);
+ oprot.writeI32(this.port);
+ oprot.writeFieldEnd();
+ oprot.writeFieldBegin(UPTIME_SECS_FIELD_DESC);
+ oprot.writeI32(this.uptimeSecs);
+ oprot.writeFieldEnd();
+ oprot.writeFieldBegin(IS_LEADER_FIELD_DESC);
+ oprot.writeBool(this.isLeader);
+ oprot.writeFieldEnd();
+ if (this.version != null) {
+ oprot.writeFieldBegin(VERSION_FIELD_DESC);
+ oprot.writeString(this.version);
+ oprot.writeFieldEnd();
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("NimbusSummary(");
+ boolean first = true;
+
+ sb.append("host:");
+ if (this.host == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.host);
+ }
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("port:");
+ sb.append(this.port);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("uptimeSecs:");
+ sb.append(this.uptimeSecs);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("isLeader:");
+ sb.append(this.isLeader);
+ first = false;
+ if (!first) sb.append(", ");
+ sb.append("version:");
+ if (this.version == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.version);
+ }
+ first = false;
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_host()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'host' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_port()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'port' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_uptimeSecs()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'uptimeSecs' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_isLeader()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'isLeader' is unset! Struct:" + toString());
+ }
+
+ if (!is_set_version()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'version' is unset! Struct:" + toString());
+ }
+
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ // it doesn't seem like you should have to do this, but java serialization is wacky, and doesn't call the default constructor.
+ __isset_bit_vector = new BitSet(1);
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/storm/blob/4502bffb/storm-core/src/py/storm/ttypes.py
----------------------------------------------------------------------
diff --git a/storm-core/src/py/storm/ttypes.py b/storm-core/src/py/storm/ttypes.py
index e4fb751..a5e155c 100644
--- a/storm-core/src/py/storm/ttypes.py
+++ b/storm-core/src/py/storm/ttypes.py
@@ -2354,28 +2354,150 @@ class SupervisorSummary:
def __ne__(self, other):
return not (self == other)
+class NimbusSummary:
+ """
+ Attributes:
+ - host
+ - port
+ - uptimeSecs
+ - isLeader
+ - version
+ """
+
+ thrift_spec = (
+ None, # 0
+ (1, TType.STRING, 'host', None, None, ), # 1
+ (2, TType.I32, 'port', None, None, ), # 2
+ (3, TType.I32, 'uptimeSecs', None, None, ), # 3
+ (4, TType.BOOL, 'isLeader', None, None, ), # 4
+ (5, TType.STRING, 'version', None, None, ), # 5
+ )
+
+ def __hash__(self):
+ return 0 + hash(self.host) + hash(self.port) + hash(self.uptimeSecs) + hash(self.isLeader) + hash(self.version)
+
+ def __init__(self, host=None, port=None, uptimeSecs=None, isLeader=None, version=None,):
+ self.host = host
+ self.port = port
+ self.uptimeSecs = uptimeSecs
+ self.isLeader = isLeader
+ self.version = version
+
+ def read(self, iprot):
+ if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
+ fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec))
+ return
+ iprot.readStructBegin()
+ while True:
+ (fname, ftype, fid) = iprot.readFieldBegin()
+ if ftype == TType.STOP:
+ break
+ if fid == 1:
+ if ftype == TType.STRING:
+ self.host = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ elif fid == 2:
+ if ftype == TType.I32:
+ self.port = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 3:
+ if ftype == TType.I32:
+ self.uptimeSecs = iprot.readI32();
+ else:
+ iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.BOOL:
+ self.isLeader = iprot.readBool();
+ else:
+ iprot.skip(ftype)
+ elif fid == 5:
+ if ftype == TType.STRING:
+ self.version = iprot.readString().decode('utf-8')
+ else:
+ iprot.skip(ftype)
+ else:
+ iprot.skip(ftype)
+ iprot.readFieldEnd()
+ iprot.readStructEnd()
+
+ def write(self, oprot):
+ if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None:
+ oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec)))
+ return
+ oprot.writeStructBegin('NimbusSummary')
+ if self.host is not None:
+ oprot.writeFieldBegin('host', TType.STRING, 1)
+ oprot.writeString(self.host.encode('utf-8'))
+ oprot.writeFieldEnd()
+ if self.port is not None:
+ oprot.writeFieldBegin('port', TType.I32, 2)
+ oprot.writeI32(self.port)
+ oprot.writeFieldEnd()
+ if self.uptimeSecs is not None:
+ oprot.writeFieldBegin('uptimeSecs', TType.I32, 3)
+ oprot.writeI32(self.uptimeSecs)
+ oprot.writeFieldEnd()
+ if self.isLeader is not None:
+ oprot.writeFieldBegin('isLeader', TType.BOOL, 4)
+ oprot.writeBool(self.isLeader)
+ oprot.writeFieldEnd()
+ if self.version is not None:
+ oprot.writeFieldBegin('version', TType.STRING, 5)
+ oprot.writeString(self.version.encode('utf-8'))
+ oprot.writeFieldEnd()
+ oprot.writeFieldStop()
+ oprot.writeStructEnd()
+
+ def validate(self):
+ if self.host is None:
+ raise TProtocol.TProtocolException(message='Required field host is unset!')
+ if self.port is None:
+ raise TProtocol.TProtocolException(message='Required field port is unset!')
+ if self.uptimeSecs is None:
+ raise TProtocol.TProtocolException(message='Required field uptimeSecs is unset!')
+ if self.isLeader is None:
+ raise TProtocol.TProtocolException(message='Required field isLeader is unset!')
+ if self.version is None:
+ raise TProtocol.TProtocolException(message='Required field version is unset!')
+ return
+
+
+ def __repr__(self):
+ L = ['%s=%r' % (key, value)
+ for key, value in self.__dict__.iteritems()]
+ return '%s(%s)' % (self.__class__.__name__, ', '.join(L))
+
+ def __eq__(self, other):
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
+
+ def __ne__(self, other):
+ return not (self == other)
+
class ClusterSummary:
"""
Attributes:
- supervisors
- - nimbus_uptime_secs
- topologies
+ - nimbuses
"""
thrift_spec = (
None, # 0
(1, TType.LIST, 'supervisors', (TType.STRUCT,(SupervisorSummary, SupervisorSummary.thrift_spec)), None, ), # 1
- (2, TType.I32, 'nimbus_uptime_secs', None, None, ), # 2
+ None, # 2
(3, TType.LIST, 'topologies', (TType.STRUCT,(TopologySummary, TopologySummary.thrift_spec)), None, ), # 3
+ (4, TType.LIST, 'nimbuses', (TType.STRUCT,(NimbusSummary, NimbusSummary.thrift_spec)), None, ), # 4
)
def __hash__(self):
- return 0 + hash(self.supervisors) + hash(self.nimbus_uptime_secs) + hash(self.topologies)
+ return 0 + hash(self.supervisors) + hash(self.topologies) + hash(self.nimbuses)
- def __init__(self, supervisors=None, nimbus_uptime_secs=None, topologies=None,):
+ def __init__(self, supervisors=None, topologies=None, nimbuses=None,):
self.supervisors = supervisors
- self.nimbus_uptime_secs = nimbus_uptime_secs
self.topologies = topologies
+ self.nimbuses = nimbuses
def read(self, iprot):
if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None:
@@ -2397,11 +2519,6 @@ class ClusterSummary:
iprot.readListEnd()
else:
iprot.skip(ftype)
- elif fid == 2:
- if ftype == TType.I32:
- self.nimbus_uptime_secs = iprot.readI32();
- else:
- iprot.skip(ftype)
elif fid == 3:
if ftype == TType.LIST:
self.topologies = []
@@ -2413,6 +2530,17 @@ class ClusterSummary:
iprot.readListEnd()
else:
iprot.skip(ftype)
+ elif fid == 4:
+ if ftype == TType.LIST:
+ self.nimbuses = []
+ (_etype81, _size78) = iprot.readListBegin()
+ for _i82 in xrange(_size78):
+ _elem83 = NimbusSummary()
+ _elem83.read(iprot)
+ self.nimbuses.append(_elem83)
+ iprot.readListEnd()
+ else:
+ iprot.skip(ftype)
else:
iprot.skip(ftype)
iprot.readFieldEnd()
@@ -2426,19 +2554,22 @@ class ClusterSummary:
if self.supervisors is not None:
oprot.writeFieldBegin('supervisors', TType.LIST, 1)
oprot.writeListBegin(TType.STRUCT, len(self.supervisors))
- for iter78 in self.supervisors:
- iter78.write(oprot)
+ for iter84 in self.supervisors:
+ iter84.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
- if self.nimbus_uptime_secs is not None:
- oprot.writeFieldBegin('nimbus_uptime_secs', TType.I32, 2)
- oprot.writeI32(self.nimbus_uptime_secs)
- oprot.writeFieldEnd()
if self.topologies is not None:
oprot.writeFieldBegin('topologies', TType.LIST, 3)
oprot.writeListBegin(TType.STRUCT, len(self.topologies))
- for iter79 in self.topologies:
- iter79.write(oprot)
+ for iter85 in self.topologies:
+ iter85.write(oprot)
+ oprot.writeListEnd()
+ oprot.writeFieldEnd()
+ if self.nimbuses is not None:
+ oprot.writeFieldBegin('nimbuses', TType.LIST, 4)
+ oprot.writeListBegin(TType.STRUCT, len(self.nimbuses))
+ for iter86 in self.nimbuses:
+ iter86.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -2447,10 +2578,10 @@ class ClusterSummary:
def validate(self):
if self.supervisors is None:
raise TProtocol.TProtocolException(message='Required field supervisors is unset!')
- if self.nimbus_uptime_secs is None:
- raise TProtocol.TProtocolException(message='Required field nimbus_uptime_secs is unset!')
if self.topologies is None:
raise TProtocol.TProtocolException(message='Required field topologies is unset!')
+ if self.nimbuses is None:
+ raise TProtocol.TProtocolException(message='Required field nimbuses is unset!')
return
@@ -2609,90 +2740,90 @@ class BoltStats:
if fid == 1:
if ftype == TType.MAP:
self.acked = {}
- (_ktype81, _vtype82, _size80 ) = iprot.readMapBegin()
- for _i84 in xrange(_size80):
- _key85 = iprot.readString().decode('utf-8')
- _val86 = {}
- (_ktype88, _vtype89, _size87 ) = iprot.readMapBegin()
- for _i91 in xrange(_size87):
- _key92 = GlobalStreamId()
- _key92.read(iprot)
- _val93 = iprot.readI64();
- _val86[_key92] = _val93
+ (_ktype88, _vtype89, _size87 ) = iprot.readMapBegin()
+ for _i91 in xrange(_size87):
+ _key92 = iprot.readString().decode('utf-8')
+ _val93 = {}
+ (_ktype95, _vtype96, _size94 ) = iprot.readMapBegin()
+ for _i98 in xrange(_size94):
+ _key99 = GlobalStreamId()
+ _key99.read(iprot)
+ _val100 = iprot.readI64();
+ _val93[_key99] = _val100
iprot.readMapEnd()
- self.acked[_key85] = _val86
+ self.acked[_key92] = _val93
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.MAP:
self.failed = {}
- (_ktype95, _vtype96, _size94 ) = iprot.readMapBegin()
- for _i98 in xrange(_size94):
- _key99 = iprot.readString().decode('utf-8')
- _val100 = {}
- (_ktype102, _vtype103, _size101 ) = iprot.readMapBegin()
- for _i105 in xrange(_size101):
- _key106 = GlobalStreamId()
- _key106.read(iprot)
- _val107 = iprot.readI64();
- _val100[_key106] = _val107
+ (_ktype102, _vtype103, _size101 ) = iprot.readMapBegin()
+ for _i105 in xrange(_size101):
+ _key106 = iprot.readString().decode('utf-8')
+ _val107 = {}
+ (_ktype109, _vtype110, _size108 ) = iprot.readMapBegin()
+ for _i112 in xrange(_size108):
+ _key113 = GlobalStreamId()
+ _key113.read(iprot)
+ _val114 = iprot.readI64();
+ _val107[_key113] = _val114
iprot.readMapEnd()
- self.failed[_key99] = _val100
+ self.failed[_key106] = _val107
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.MAP:
self.process_ms_avg = {}
- (_ktype109, _vtype110, _size108 ) = iprot.readMapBegin()
- for _i112 in xrange(_size108):
- _key113 = iprot.readString().decode('utf-8')
- _val114 = {}
- (_ktype116, _vtype117, _size115 ) = iprot.readMapBegin()
- for _i119 in xrange(_size115):
- _key120 = GlobalStreamId()
- _key120.read(iprot)
- _val121 = iprot.readDouble();
- _val114[_key120] = _val121
+ (_ktype116, _vtype117, _size115 ) = iprot.readMapBegin()
+ for _i119 in xrange(_size115):
+ _key120 = iprot.readString().decode('utf-8')
+ _val121 = {}
+ (_ktype123, _vtype124, _size122 ) = iprot.readMapBegin()
+ for _i126 in xrange(_size122):
+ _key127 = GlobalStreamId()
+ _key127.read(iprot)
+ _val128 = iprot.readDouble();
+ _val121[_key127] = _val128
iprot.readMapEnd()
- self.process_ms_avg[_key113] = _val114
+ self.process_ms_avg[_key120] = _val121
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 4:
if ftype == TType.MAP:
self.executed = {}
- (_ktype123, _vtype124, _size122 ) = iprot.readMapBegin()
- for _i126 in xrange(_size122):
- _key127 = iprot.readString().decode('utf-8')
- _val128 = {}
- (_ktype130, _vtype131, _size129 ) = iprot.readMapBegin()
- for _i133 in xrange(_size129):
- _key134 = GlobalStreamId()
- _key134.read(iprot)
- _val135 = iprot.readI64();
- _val128[_key134] = _val135
+ (_ktype130, _vtype131, _size129 ) = iprot.readMapBegin()
+ for _i133 in xrange(_size129):
+ _key134 = iprot.readString().decode('utf-8')
+ _val135 = {}
+ (_ktype137, _vtype138, _size136 ) = iprot.readMapBegin()
+ for _i140 in xrange(_size136):
+ _key141 = GlobalStreamId()
+ _key141.read(iprot)
+ _val142 = iprot.readI64();
+ _val135[_key141] = _val142
iprot.readMapEnd()
- self.executed[_key127] = _val128
+ self.executed[_key134] = _val135
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 5:
if ftype == TType.MAP:
self.execute_ms_avg = {}
- (_ktype137, _vtype138, _size136 ) = iprot.readMapBegin()
- for _i140 in xrange(_size136):
- _key141 = iprot.readString().decode('utf-8')
- _val142 = {}
- (_ktype144, _vtype145, _size143 ) = iprot.readMapBegin()
- for _i147 in xrange(_size143):
- _key148 = GlobalStreamId()
- _key148.read(iprot)
- _val149 = iprot.readDouble();
- _val142[_key148] = _val149
+ (_ktype144, _vtype145, _size143 ) = iprot.readMapBegin()
+ for _i147 in xrange(_size143):
+ _key148 = iprot.readString().decode('utf-8')
+ _val149 = {}
+ (_ktype151, _vtype152, _size150 ) = iprot.readMapBegin()
+ for _i154 in xrange(_size150):
+ _key155 = GlobalStreamId()
+ _key155.read(iprot)
+ _val156 = iprot.readDouble();
+ _val149[_key155] = _val156
iprot.readMapEnd()
- self.execute_ms_avg[_key141] = _val142
+ self.execute_ms_avg[_key148] = _val149
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -2709,60 +2840,60 @@ class BoltStats:
if self.acked is not None:
oprot.writeFieldBegin('acked', TType.MAP, 1)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.acked))
- for kiter150,viter151 in self.acked.items():
- oprot.writeString(kiter150.encode('utf-8'))
- oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter151))
- for kiter152,viter153 in viter151.items():
- kiter152.write(oprot)
- oprot.writeI64(viter153)
+ for kiter157,viter158 in self.acked.items():
+ oprot.writeString(kiter157.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter158))
+ for kiter159,viter160 in viter158.items():
+ kiter159.write(oprot)
+ oprot.writeI64(viter160)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.failed is not None:
oprot.writeFieldBegin('failed', TType.MAP, 2)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.failed))
- for kiter154,viter155 in self.failed.items():
- oprot.writeString(kiter154.encode('utf-8'))
- oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter155))
- for kiter156,viter157 in viter155.items():
- kiter156.write(oprot)
- oprot.writeI64(viter157)
+ for kiter161,viter162 in self.failed.items():
+ oprot.writeString(kiter161.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter162))
+ for kiter163,viter164 in viter162.items():
+ kiter163.write(oprot)
+ oprot.writeI64(viter164)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.process_ms_avg is not None:
oprot.writeFieldBegin('process_ms_avg', TType.MAP, 3)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.process_ms_avg))
- for kiter158,viter159 in self.process_ms_avg.items():
- oprot.writeString(kiter158.encode('utf-8'))
- oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter159))
- for kiter160,viter161 in viter159.items():
- kiter160.write(oprot)
- oprot.writeDouble(viter161)
+ for kiter165,viter166 in self.process_ms_avg.items():
+ oprot.writeString(kiter165.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter166))
+ for kiter167,viter168 in viter166.items():
+ kiter167.write(oprot)
+ oprot.writeDouble(viter168)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.executed is not None:
oprot.writeFieldBegin('executed', TType.MAP, 4)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.executed))
- for kiter162,viter163 in self.executed.items():
- oprot.writeString(kiter162.encode('utf-8'))
- oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter163))
- for kiter164,viter165 in viter163.items():
- kiter164.write(oprot)
- oprot.writeI64(viter165)
+ for kiter169,viter170 in self.executed.items():
+ oprot.writeString(kiter169.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRUCT, TType.I64, len(viter170))
+ for kiter171,viter172 in viter170.items():
+ kiter171.write(oprot)
+ oprot.writeI64(viter172)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.execute_ms_avg is not None:
oprot.writeFieldBegin('execute_ms_avg', TType.MAP, 5)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.execute_ms_avg))
- for kiter166,viter167 in self.execute_ms_avg.items():
- oprot.writeString(kiter166.encode('utf-8'))
- oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter167))
- for kiter168,viter169 in viter167.items():
- kiter168.write(oprot)
- oprot.writeDouble(viter169)
+ for kiter173,viter174 in self.execute_ms_avg.items():
+ oprot.writeString(kiter173.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRUCT, TType.DOUBLE, len(viter174))
+ for kiter175,viter176 in viter174.items():
+ kiter175.write(oprot)
+ oprot.writeDouble(viter176)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
@@ -2829,51 +2960,51 @@ class SpoutStats:
if fid == 1:
if ftype == TType.MAP:
self.acked = {}
- (_ktype171, _vtype172, _size170 ) = iprot.readMapBegin()
- for _i174 in xrange(_size170):
- _key175 = iprot.readString().decode('utf-8')
- _val176 = {}
- (_ktype178, _vtype179, _size177 ) = iprot.readMapBegin()
- for _i181 in xrange(_size177):
- _key182 = iprot.readString().decode('utf-8')
- _val183 = iprot.readI64();
- _val176[_key182] = _val183
+ (_ktype178, _vtype179, _size177 ) = iprot.readMapBegin()
+ for _i181 in xrange(_size177):
+ _key182 = iprot.readString().decode('utf-8')
+ _val183 = {}
+ (_ktype185, _vtype186, _size184 ) = iprot.readMapBegin()
+ for _i188 in xrange(_size184):
+ _key189 = iprot.readString().decode('utf-8')
+ _val190 = iprot.readI64();
+ _val183[_key189] = _val190
iprot.readMapEnd()
- self.acked[_key175] = _val176
+ self.acked[_key182] = _val183
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.MAP:
self.failed = {}
- (_ktype185, _vtype186, _size184 ) = iprot.readMapBegin()
- for _i188 in xrange(_size184):
- _key189 = iprot.readString().decode('utf-8')
- _val190 = {}
- (_ktype192, _vtype193, _size191 ) = iprot.readMapBegin()
- for _i195 in xrange(_size191):
- _key196 = iprot.readString().decode('utf-8')
- _val197 = iprot.readI64();
- _val190[_key196] = _val197
+ (_ktype192, _vtype193, _size191 ) = iprot.readMapBegin()
+ for _i195 in xrange(_size191):
+ _key196 = iprot.readString().decode('utf-8')
+ _val197 = {}
+ (_ktype199, _vtype200, _size198 ) = iprot.readMapBegin()
+ for _i202 in xrange(_size198):
+ _key203 = iprot.readString().decode('utf-8')
+ _val204 = iprot.readI64();
+ _val197[_key203] = _val204
iprot.readMapEnd()
- self.failed[_key189] = _val190
+ self.failed[_key196] = _val197
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 3:
if ftype == TType.MAP:
self.complete_ms_avg = {}
- (_ktype199, _vtype200, _size198 ) = iprot.readMapBegin()
- for _i202 in xrange(_size198):
- _key203 = iprot.readString().decode('utf-8')
- _val204 = {}
- (_ktype206, _vtype207, _size205 ) = iprot.readMapBegin()
- for _i209 in xrange(_size205):
- _key210 = iprot.readString().decode('utf-8')
- _val211 = iprot.readDouble();
- _val204[_key210] = _val211
+ (_ktype206, _vtype207, _size205 ) = iprot.readMapBegin()
+ for _i209 in xrange(_size205):
+ _key210 = iprot.readString().decode('utf-8')
+ _val211 = {}
+ (_ktype213, _vtype214, _size212 ) = iprot.readMapBegin()
+ for _i216 in xrange(_size212):
+ _key217 = iprot.readString().decode('utf-8')
+ _val218 = iprot.readDouble();
+ _val211[_key217] = _val218
iprot.readMapEnd()
- self.complete_ms_avg[_key203] = _val204
+ self.complete_ms_avg[_key210] = _val211
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -2890,36 +3021,36 @@ class SpoutStats:
if self.acked is not None:
oprot.writeFieldBegin('acked', TType.MAP, 1)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.acked))
- for kiter212,viter213 in self.acked.items():
- oprot.writeString(kiter212.encode('utf-8'))
- oprot.writeMapBegin(TType.STRING, TType.I64, len(viter213))
- for kiter214,viter215 in viter213.items():
- oprot.writeString(kiter214.encode('utf-8'))
- oprot.writeI64(viter215)
+ for kiter219,viter220 in self.acked.items():
+ oprot.writeString(kiter219.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRING, TType.I64, len(viter220))
+ for kiter221,viter222 in viter220.items():
+ oprot.writeString(kiter221.encode('utf-8'))
+ oprot.writeI64(viter222)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.failed is not None:
oprot.writeFieldBegin('failed', TType.MAP, 2)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.failed))
- for kiter216,viter217 in self.failed.items():
- oprot.writeString(kiter216.encode('utf-8'))
- oprot.writeMapBegin(TType.STRING, TType.I64, len(viter217))
- for kiter218,viter219 in viter217.items():
- oprot.writeString(kiter218.encode('utf-8'))
- oprot.writeI64(viter219)
+ for kiter223,viter224 in self.failed.items():
+ oprot.writeString(kiter223.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRING, TType.I64, len(viter224))
+ for kiter225,viter226 in viter224.items():
+ oprot.writeString(kiter225.encode('utf-8'))
+ oprot.writeI64(viter226)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.complete_ms_avg is not None:
oprot.writeFieldBegin('complete_ms_avg', TType.MAP, 3)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.complete_ms_avg))
- for kiter220,viter221 in self.complete_ms_avg.items():
- oprot.writeString(kiter220.encode('utf-8'))
- oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(viter221))
- for kiter222,viter223 in viter221.items():
- oprot.writeString(kiter222.encode('utf-8'))
- oprot.writeDouble(viter223)
+ for kiter227,viter228 in self.complete_ms_avg.items():
+ oprot.writeString(kiter227.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRING, TType.DOUBLE, len(viter228))
+ for kiter229,viter230 in viter228.items():
+ oprot.writeString(kiter229.encode('utf-8'))
+ oprot.writeDouble(viter230)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
@@ -3059,34 +3190,34 @@ class ExecutorStats:
if fid == 1:
if ftype == TType.MAP:
self.emitted = {}
- (_ktype225, _vtype226, _size224 ) = iprot.readMapBegin()
- for _i228 in xrange(_size224):
- _key229 = iprot.readString().decode('utf-8')
- _val230 = {}
- (_ktype232, _vtype233, _size231 ) = iprot.readMapBegin()
- for _i235 in xrange(_size231):
- _key236 = iprot.readString().decode('utf-8')
- _val237 = iprot.readI64();
- _val230[_key236] = _val237
+ (_ktype232, _vtype233, _size231 ) = iprot.readMapBegin()
+ for _i235 in xrange(_size231):
+ _key236 = iprot.readString().decode('utf-8')
+ _val237 = {}
+ (_ktype239, _vtype240, _size238 ) = iprot.readMapBegin()
+ for _i242 in xrange(_size238):
+ _key243 = iprot.readString().decode('utf-8')
+ _val244 = iprot.readI64();
+ _val237[_key243] = _val244
iprot.readMapEnd()
- self.emitted[_key229] = _val230
+ self.emitted[_key236] = _val237
iprot.readMapEnd()
else:
iprot.skip(ftype)
elif fid == 2:
if ftype == TType.MAP:
self.transferred = {}
- (_ktype239, _vtype240, _size238 ) = iprot.readMapBegin()
- for _i242 in xrange(_size238):
- _key243 = iprot.readString().decode('utf-8')
- _val244 = {}
- (_ktype246, _vtype247, _size245 ) = iprot.readMapBegin()
- for _i249 in xrange(_size245):
- _key250 = iprot.readString().decode('utf-8')
- _val251 = iprot.readI64();
- _val244[_key250] = _val251
+ (_ktype246, _vtype247, _size245 ) = iprot.readMapBegin()
+ for _i249 in xrange(_size245):
+ _key250 = iprot.readString().decode('utf-8')
+ _val251 = {}
+ (_ktype253, _vtype254, _size252 ) = iprot.readMapBegin()
+ for _i256 in xrange(_size252):
+ _key257 = iprot.readString().decode('utf-8')
+ _val258 = iprot.readI64();
+ _val251[_key257] = _val258
iprot.readMapEnd()
- self.transferred[_key243] = _val244
+ self.transferred[_key250] = _val251
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -3109,24 +3240,24 @@ class ExecutorStats:
if self.emitted is not None:
oprot.writeFieldBegin('emitted', TType.MAP, 1)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.emitted))
- for kiter252,viter253 in self.emitted.items():
- oprot.writeString(kiter252.encode('utf-8'))
- oprot.writeMapBegin(TType.STRING, TType.I64, len(viter253))
- for kiter254,viter255 in viter253.items():
- oprot.writeString(kiter254.encode('utf-8'))
- oprot.writeI64(viter255)
+ for kiter259,viter260 in self.emitted.items():
+ oprot.writeString(kiter259.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRING, TType.I64, len(viter260))
+ for kiter261,viter262 in viter260.items():
+ oprot.writeString(kiter261.encode('utf-8'))
+ oprot.writeI64(viter262)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
if self.transferred is not None:
oprot.writeFieldBegin('transferred', TType.MAP, 2)
oprot.writeMapBegin(TType.STRING, TType.MAP, len(self.transferred))
- for kiter256,viter257 in self.transferred.items():
- oprot.writeString(kiter256.encode('utf-8'))
- oprot.writeMapBegin(TType.STRING, TType.I64, len(viter257))
- for kiter258,viter259 in viter257.items():
- oprot.writeString(kiter258.encode('utf-8'))
- oprot.writeI64(viter259)
+ for kiter263,viter264 in self.transferred.items():
+ oprot.writeString(kiter263.encode('utf-8'))
+ oprot.writeMapBegin(TType.STRING, TType.I64, len(viter264))
+ for kiter265,viter266 in viter264.items():
+ oprot.writeString(kiter265.encode('utf-8'))
+ oprot.writeI64(viter266)
oprot.writeMapEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
@@ -3947,11 +4078,11 @@ class TopologyInfo:
elif fid == 4:
if ftype == TType.LIST:
self.executors = []
- (_etype263, _size260) = iprot.readListBegin()
- for _i264 in xrange(_size260):
- _elem265 = ExecutorSummary()
- _elem265.read(iprot)
- self.executors.append(_elem265)
+ (_etype270, _size267) = iprot.readListBegin()
+ for _i271 in xrange(_size267):
+ _elem272 = ExecutorSummary()
+ _elem272.read(iprot)
+ self.executors.append(_elem272)
iprot.readListEnd()
else:
iprot.skip(ftype)
@@ -3963,17 +4094,17 @@ class TopologyInfo:
elif fid == 6:
if ftype == TType.MAP:
self.errors = {}
- (_ktype267, _vtype268, _size266 ) = iprot.readMapBegin()
- for _i270 in xrange(_size266):
- _key271 = iprot.readString().decode('utf-8')
- _val272 = []
- (_etype276, _size273) = iprot.readListBegin()
- for _i277 in xrange(_size273):
- _elem278 = ErrorInfo()
- _elem278.read(iprot)
- _val272.append(_elem278)
+ (_ktype274, _vtype275, _size273 ) = iprot.readMapBegin()
+ for _i277 in xrange(_size273):
+ _key278 = iprot.readString().decode('utf-8')
+ _val279 = []
+ (_etype283, _size280) = iprot.readListBegin()
+ for _i284 in xrange(_size280):
+ _elem285 = ErrorInfo()
+ _elem285.read(iprot)
+ _val279.append(_elem285)
iprot.readListEnd()
- self.errors[_key271] = _val272
+ self.errors[_key278] = _val279
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -4017,8 +4148,8 @@ class TopologyInfo:
if self.executors is not None:
oprot.writeFieldBegin('executors', TType.LIST, 4)
oprot.writeListBegin(TType.STRUCT, len(self.executors))
- for iter279 in self.executors:
- iter279.write(oprot)
+ for iter286 in self.executors:
+ iter286.write(oprot)
oprot.writeListEnd()
oprot.writeFieldEnd()
if self.status is not None:
@@ -4028,11 +4159,11 @@ class TopologyInfo:
if self.errors is not None:
oprot.writeFieldBegin('errors', TType.MAP, 6)
oprot.writeMapBegin(TType.STRING, TType.LIST, len(self.errors))
- for kiter280,viter281 in self.errors.items():
- oprot.writeString(kiter280.encode('utf-8'))
- oprot.writeListBegin(TType.STRUCT, len(viter281))
- for iter282 in viter281:
- iter282.write(oprot)
+ for kiter287,viter288 in self.errors.items():
+ oprot.writeString(kiter287.encode('utf-8'))
+ oprot.writeListBegin(TType.STRUCT, len(viter288))
+ for iter289 in viter288:
+ iter289.write(oprot)
oprot.writeListEnd()
oprot.writeMapEnd()
oprot.writeFieldEnd()
@@ -4186,11 +4317,11 @@ class RebalanceOptions:
elif fid == 3:
if ftype == TType.MAP:
self.num_executors = {}
- (_ktype284, _vtype285, _size283 ) = iprot.readMapBegin()
- for _i287 in xrange(_size283):
- _key288 = iprot.readString().decode('utf-8')
- _val289 = iprot.readI32();
- self.num_executors[_key288] = _val289
+ (_ktype291, _vtype292, _size290 ) = iprot.readMapBegin()
+ for _i294 in xrange(_size290):
+ _key295 = iprot.readString().decode('utf-8')
+ _val296 = iprot.readI32();
+ self.num_executors[_key295] = _val296
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -4215,9 +4346,9 @@ class RebalanceOptions:
if self.num_executors is not None:
oprot.writeFieldBegin('num_executors', TType.MAP, 3)
oprot.writeMapBegin(TType.STRING, TType.I32, len(self.num_executors))
- for kiter290,viter291 in self.num_executors.items():
- oprot.writeString(kiter290.encode('utf-8'))
- oprot.writeI32(viter291)
+ for kiter297,viter298 in self.num_executors.items():
+ oprot.writeString(kiter297.encode('utf-8'))
+ oprot.writeI32(viter298)
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()
@@ -4267,11 +4398,11 @@ class Credentials:
if fid == 1:
if ftype == TType.MAP:
self.creds = {}
- (_ktype293, _vtype294, _size292 ) = iprot.readMapBegin()
- for _i296 in xrange(_size292):
- _key297 = iprot.readString().decode('utf-8')
- _val298 = iprot.readString().decode('utf-8')
- self.creds[_key297] = _val298
+ (_ktype300, _vtype301, _size299 ) = iprot.readMapBegin()
+ for _i303 in xrange(_size299):
+ _key304 = iprot.readString().decode('utf-8')
+ _val305 = iprot.readString().decode('utf-8')
+ self.creds[_key304] = _val305
iprot.readMapEnd()
else:
iprot.skip(ftype)
@@ -4288,9 +4419,9 @@ class Credentials:
if self.creds is not None:
oprot.writeFieldBegin('creds', TType.MAP, 1)
oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.creds))
- for kiter299,viter300 in self.creds.items():
- oprot.writeString(kiter299.encode('utf-8'))
- oprot.writeString(viter300.encode('utf-8'))
+ for kiter306,viter307 in self.creds.items():
+ oprot.writeString(kiter306.encode('utf-8'))
+ oprot.writeString(viter307.encode('utf-8'))
oprot.writeMapEnd()
oprot.writeFieldEnd()
oprot.writeFieldStop()