You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/03/18 20:39:11 UTC
[04/16] storm git commit: STORM-634: Converting SupervisorInfo,
Assignment, StormBase, TopologyStatus, ZKWorkerHeartbeat, ErrorInfo,
Credentials to thrift and defaulting the serialization delegate to thrift
serialization. Added class as a param to serializatio
STORM-634: Converting SupervisorInfo,Assignment,StormBase,TopologyStatus,ZKWorkerHeartbeat,ErrorInfo,Credentials to thrift and defaulting the serialization delegate to thrift serialization. Added class as a param to serialization delegate interface.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63900643
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63900643
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63900643
Branch: refs/heads/master
Commit: 639006432c658226bd33dc2ae607f121f3dc02bb
Parents: a115c9d
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Jan 29 13:46:01 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Feb 3 11:52:15 2015 -0800
----------------------------------------------------------------------
conf/defaults.yaml | 2 +-
storm-core/src/clj/backtype/storm/bootstrap.clj | 2 +-
storm-core/src/clj/backtype/storm/cluster.clj | 47 +-
storm-core/src/clj/backtype/storm/config.clj | 7 +-
storm-core/src/clj/backtype/storm/converter.clj | 200 ++++
.../src/clj/backtype/storm/daemon/common.clj | 10 +-
.../src/clj/backtype/storm/daemon/executor.clj | 6 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 105 +-
.../src/clj/backtype/storm/daemon/worker.clj | 2 +-
storm-core/src/clj/backtype/storm/stats.clj | 78 +-
storm-core/src/clj/backtype/storm/thrift.clj | 9 +-
.../storm/coordination/BatchBoltExecutor.java | 2 +-
.../backtype/storm/generated/Assignment.java | 817 +++++++++++++
.../backtype/storm/generated/ExecutorStats.java | 125 +-
.../jvm/backtype/storm/generated/NodeInfo.java | 479 ++++++++
.../jvm/backtype/storm/generated/StormBase.java | 1078 ++++++++++++++++++
.../storm/generated/SupervisorInfo.java | 1030 +++++++++++++++++
.../storm/generated/TopologyActionOptions.java | 335 ++++++
.../storm/generated/TopologyStatus.java | 67 ++
.../storm/generated/ZKWorkerHeartbeat.java | 586 ++++++++++
.../DefaultSerializationDelegate.java | 10 +-
.../GzipBridgeSerializationDelegate.java | 6 +-
.../GzipSerializationDelegate.java | 10 +-
.../serialization/SerializationDelegate.java | 2 +-
.../jvm/backtype/storm/serialization/Test.java | 17 +
.../ThriftSerializationDelegate.java | 52 +
.../ThriftSerializationDelegateBridge.java | 51 +
.../jvm/backtype/storm/utils/LocalState.java | 2 +-
.../src/jvm/backtype/storm/utils/Utils.java | 56 +-
.../src/jvm/storm/trident/TridentTopology.java | 4 +-
storm-core/src/py/storm/ttypes.py | 796 ++++++++++++-
storm-core/src/storm.thrift | 51 +
.../test/clj/backtype/storm/cluster_test.clj | 25 +-
.../test/clj/backtype/storm/nimbus_test.clj | 6 +-
.../GzipBridgeSerializationDelegateTest.java | 6 +-
.../ThriftBridgeSerializationDelegateTest.java | 79 ++
36 files changed, 5957 insertions(+), 203 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 35d20ff..141e1d3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -47,7 +47,7 @@ storm.auth.simple-white-list.users: []
storm.auth.simple-acl.users: []
storm.auth.simple-acl.users.commands: []
storm.auth.simple-acl.admins: []
-storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
+storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegateBridge"
### nimbus.* configs are for the master
nimbus.host: "localhost"
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/bootstrap.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj
index c1063cf..bc08e41 100644
--- a/storm-core/src/clj/backtype/storm/bootstrap.clj
+++ b/storm-core/src/clj/backtype/storm/bootstrap.clj
@@ -57,7 +57,7 @@
KillOptions SubmitOptions RebalanceOptions JavaObject JavaObjectArg
TopologyInitialStatus AuthorizationException]))
(import (quote [backtype.storm.daemon.common StormBase Assignment
- SupervisorInfo WorkerHeartbeat]))
+ SupervisorInfo]))
(import (quote [backtype.storm.grouping CustomStreamGrouping]))
(import (quote [java.io File FileOutputStream FileInputStream]))
(import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList]))
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 8ead710..15bf8a3 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -15,12 +15,14 @@
;; limitations under the License.
(ns backtype.storm.cluster
- (:import [org.apache.zookeeper.data Stat ACL Id])
+ (:import [org.apache.zookeeper.data Stat ACL Id]
+ [backtype.storm.generated SupervisorInfo Assignment StormBase ZKWorkerHeartbeat ErrorInfo Credentials]
+ [java.io Serializable])
(:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
(:import [backtype.storm.utils Utils])
(:import [java.security MessageDigest])
(:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
- (:use [backtype.storm util log config])
+ (:use [backtype.storm util log config converter])
(:require [backtype.storm [zookeeper :as zk]])
(:require [backtype.storm.daemon [common :as common]]))
@@ -228,9 +230,9 @@
(cb id))))
(defn- maybe-deserialize
- [ser]
+ [ser clazz]
(when ser
- (Utils/deserialize ser)))
+ (Utils/deserialize ser clazz)))
(defstruct TaskError :error :time-secs :host :port)
@@ -292,7 +294,7 @@
[this storm-id callback]
(when callback
(swap! assignment-info-callback assoc storm-id callback))
- (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
+ (clojurify-assignment (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)) Assignment)))
(assignment-info-with-version
[this storm-id callback]
@@ -300,7 +302,7 @@
(swap! assignment-info-with-version-callback assoc storm-id callback))
(let [{data :data version :version}
(get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
- {:data (maybe-deserialize data)
+ {:data (clojurify-assignment (maybe-deserialize data Assignment))
:version version}))
(assignment-version
@@ -325,7 +327,9 @@
[this storm-id node port]
(-> cluster-state
(get-data (workerbeat-path storm-id node port) false)
- maybe-deserialize))
+ (maybe-deserialize ZKWorkerHeartbeat)
+ clojurify-zk-worker-hb))
+
(executor-beats
[this storm-id executor->node+port]
@@ -348,11 +352,12 @@
(supervisor-info
[this supervisor-id]
- (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
+ (clojurify-supervisor-info (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false) SupervisorInfo)))
(worker-heartbeat!
[this storm-id node port info]
- (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info) acls))
+ (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
+ (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize thrift-worker-hb) acls)))
(remove-worker-heartbeat!
[this storm-id node port]
@@ -378,11 +383,13 @@
(supervisor-heartbeat!
[this supervisor-id info]
- (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info) acls))
+ (let [thrift-supervisor-info (thriftify-supervisor-info info)]
+ (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls)))
(activate-storm!
[this storm-id storm-base]
- (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base) acls))
+ (let [thrift-storm-base (thriftify-storm-base storm-base)]
+ (set-data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls)))
(update-storm!
[this storm-id new-elems]
@@ -392,6 +399,7 @@
(set-data cluster-state (storm-path storm-id)
(-> base
(merge new-elems)
+ thriftify-storm-base
Utils/serialize)
acls)))
@@ -399,7 +407,7 @@
[this storm-id callback]
(when callback
(swap! storm-base-callback assoc storm-id callback))
- (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
+ (clojurify-storm-base (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)) StormBase)))
(remove-storm-base!
[this storm-id]
@@ -407,7 +415,8 @@
(set-assignment!
[this storm-id info]
- (set-data cluster-state (assignment-path storm-id) (Utils/serialize info) acls))
+ (let [thrift-assignment (thriftify-assignment info)]
+ (set-data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
(remove-storm!
[this storm-id]
@@ -418,19 +427,20 @@
(set-credentials!
[this storm-id creds topo-conf]
(let [topo-acls (mk-topo-only-acls topo-conf)
- path (credentials-path storm-id)]
- (set-data cluster-state path (Utils/serialize creds) topo-acls)))
+ path (credentials-path storm-id)
+ thriftified-creds (thriftify-credentials creds)]
+ (set-data cluster-state path (Utils/serialize thriftified-creds) topo-acls)))
(credentials
[this storm-id callback]
(when callback
(swap! credentials-callback assoc storm-id callback))
- (maybe-deserialize (get-data cluster-state (credentials-path storm-id) (not-nil? callback))))
+ (clojurify-crdentials (maybe-deserialize (get-data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials)))
(report-error
[this storm-id component-id node port error]
(let [path (error-path storm-id component-id)
- data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
+ data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
_ (mkdirs cluster-state path acls)
_ (create-sequential cluster-state (str path "/e") (Utils/serialize data) acls)
to-kill (->> (get-children cluster-state path false)
@@ -446,7 +456,8 @@
errors (if (exists-node? cluster-state path false)
(dofor [c (get-children cluster-state path false)]
(let [data (-> (get-data cluster-state (str path "/" c) false)
- maybe-deserialize)]
+ (maybe-deserialize ErrorInfo)
+ clojurify-error)]
(when data
(struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
)))
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 98b1da2..d09b31b 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -15,7 +15,8 @@
;; limitations under the License.
(ns backtype.storm.config
- (:import [java.io FileReader File IOException])
+ (:import [java.io FileReader File IOException]
+ [backtype.storm.generated StormTopology])
(:import [backtype.storm Config ConfigValidation$FieldValidator])
(:import [backtype.storm.utils Utils LocalState])
(:import [org.apache.commons.io FileUtils])
@@ -211,14 +212,14 @@
(let [stormroot (supervisor-stormdist-root conf storm-id)
conf-path (supervisor-stormconf-path stormroot)
topology-path (supervisor-stormcode-path stormroot)]
- (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
+ (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path)) java.util.Map))
))
(defn read-supervisor-topology
[conf storm-id]
(let [stormroot (supervisor-stormdist-root conf storm-id)
topology-path (supervisor-stormcode-path stormroot)]
- (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
+ (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology)
))
(defn worker-user-root [conf]
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
new file mode 100644
index 0000000..6a9f4a6
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/converter.clj
@@ -0,0 +1,200 @@
+(ns backtype.storm.converter
+ (:import [backtype.storm.generated SupervisorInfo NodeInfo Assignment
+ StormBase TopologyStatus ZKWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions TopologyActionOptions])
+ (:use [backtype.storm util stats log])
+ (:require [backtype.storm.daemon [common :as common]]))
+
+(defn thriftify-supervisor-info [supervisor-info]
+ (doto (SupervisorInfo.)
+ (.set_time_secs (long (:time-secs supervisor-info)))
+ (.set_hostname (:hostname supervisor-info))
+ (.set_assignment_id (:assignment-id supervisor-info))
+ (.set_used_ports (map long (:used-ports supervisor-info)))
+ (.set_meta (map long (:meta supervisor-info)))
+ (.set_scheduler_meta (:scheduler-meta supervisor-info))
+ (.set_uptime_secs (long (:uptime-secs supervisor-info)))))
+
+(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
+ (if supervisor-info
+ (backtype.storm.daemon.common.SupervisorInfo.
+ (.get_time_secs supervisor-info)
+ (.get_hostname supervisor-info)
+ (.get_assignment_id supervisor-info)
+ (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info)))
+ (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
+ (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
+ (.get_uptime_secs supervisor-info))))
+
+(defn thriftify-assignment [assignment]
+ (doto (Assignment.)
+ (.set_master_code_dir (:master-code-dir assignment))
+ (.set_node_host (:node->host assignment))
+ (.set_executor_node_port (map-val
+ (fn [node+port]
+ (NodeInfo. (first node+port) (set (map long (rest node+port)))))
+ (map-key #(map long %)
+ (:executor->node+port assignment))))
+ (.set_executor_start_time_secs
+ (map-val
+ long
+ (map-key #(map long %)
+ (:executor->start-time-secs assignment))))))
+
+(defn clojurify-executor->node_port [executor->node_port]
+ (into {}
+ (map-val
+ (fn [nodeInfo]
+ (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
+ (map-key
+ (fn [list-of-executors]
+ (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
+ executor->node_port))))
+
+(defn clojurify-assignment [^Assignment assignment]
+ (if assignment
+ (backtype.storm.daemon.common.Assignment.
+ (.get_master_code_dir assignment)
+ (into {} (.get_node_host assignment))
+ (clojurify-executor->node_port (into {} (.get_executor_node_port assignment)))
+ (map-key (fn [executor] (into [] executor))
+ (into {} (.get_executor_start_time_secs assignment))))))
+
+(defn convert-to-symbol-from-status [status]
+ (condp = status
+ TopologyStatus/ACTIVE {:type :active}
+ TopologyStatus/INACTIVE {:type :inactive}
+ TopologyStatus/REBALANCING {:type :rebalancing}
+ TopologyStatus/KILLED {:type :killed}
+ nil))
+
+(defn- convert-to-status-from-symbol [status]
+ (if status
+ (condp = (:type status)
+ :active TopologyStatus/ACTIVE
+ :inactive TopologyStatus/INACTIVE
+ :rebalancing TopologyStatus/REBALANCING
+ :killed TopologyStatus/KILLED
+ nil)))
+
+(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
+ (-> {:action :rebalance}
+ (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
+ (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
+ (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
+
+(defn thriftify-rebalance-options [rebalance-options]
+ (if rebalance-options
+ (let [thrift-rebalance-options (RebalanceOptions.)]
+ (if (:delay-secs rebalance-options)
+ (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options))))
+ (if (:num-workers rebalance-options)
+ (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options))))
+ (if (:component->executors rebalance-options)
+ (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options))))
+ thrift-rebalance-options)))
+
+(defn clojurify-kill-options [^KillOptions kill-options]
+ (-> {:action :kill}
+ (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options)))))
+
+(defn thriftify-kill-options [kill-options]
+ (if kill-options
+ (let [thrift-kill-options (KillOptions.)]
+ (if (:delay-secs kill-options)
+ (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options))))
+ thrift-kill-options)))
+
+(defn thriftify-topology-action-options [storm-base]
+ (if (:topology-action-options storm-base)
+ (let [ topology-action-options (:topology-action-options storm-base)
+ action (:action topology-action-options)
+ thrift-topology-action-options (TopologyActionOptions.)]
+ (if (= action :kill)
+ (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options)))
+ (if (= action :rebalance)
+ (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options)))
+ thrift-topology-action-options)))
+
+(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options]
+ (if (and topology-action-options (.is_set_kill_options topology-action-options))
+ (clojurify-kill-options (.get_kill_options topology-action-options)))
+ (if (and topology-action-options (.is_set_rebalance_options topology-action-options))
+ (clojurify-rebalance-options (.get_rebalance_options topology-action-options))))
+
+(defn thriftify-storm-base [storm-base]
+ (doto (StormBase.)
+ (.set_name (:storm-name storm-base))
+ (.set_launch_time_secs (int (:launch-time-secs storm-base)))
+ (.set_status (convert-to-status-from-symbol (:status storm-base)))
+ (.set_num_workers (int (:num-workers storm-base)))
+ (.set_component_executors (map-val int (:component->executors storm-base)))
+ (.set_owner (:owner storm-base))
+ (.set_topology_action_options (thriftify-topology-action-options storm-base))
+ (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))))
+
+(defn clojurify-storm-base [^StormBase storm-base]
+ (if storm-base
+ (backtype.storm.daemon.common.StormBase.
+ (.get_name storm-base)
+ (.get_launch_time_secs storm-base)
+ (convert-to-symbol-from-status (.get_status storm-base))
+ (.get_num_workers storm-base)
+ (into {} (.get_component_executors storm-base))
+ (.get_owner storm-base)
+ (clojurify-topology-action-options (.get_topology_action_options storm-base))
+ (convert-to-symbol-from-status (.get_prev_status storm-base)))))
+
+(defn thriftify-stats [stats]
+ (if stats
+ (map-val thriftify-executor-stats
+ (map-key #(ExecutorInfo. (int (first %1)) (int (last %1)))
+ stats))
+ {}))
+
+(defn clojurify-stats [stats]
+ (if stats
+ (map-val clojurify-executor-stats
+ (map-key (fn [x] (list (.get_task_start x) (.get_task_end x)))
+ stats))
+ {}))
+
+(defn clojurify-zk-worker-hb [^ZKWorkerHeartbeat worker-hb]
+ (if worker-hb
+ {:storm-id (.get_storm_id worker-hb)
+ :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
+ :uptime (time-delta (.get_time_secs worker-hb))
+ :time-secs (.get_time_secs worker-hb)
+ }
+ {}))
+
+(defn thriftify-zk-worker-hb [worker-hb]
+ (doto (ZKWorkerHeartbeat.)
+ (.set_storm_id (:storm-id worker-hb))
+ (.set_executor_stats (thriftify-stats (:executor-stats worker-hb)))
+ (.set_time_secs (:time-secs worker-hb))))
+
+(defn clojurify-error [^ErrorInfo error]
+ (if error
+ {
+ :error (.get_error error)
+ :time-secs (.get_error_time_secs error)
+ :host (.get_host error)
+ :port (.get_port error)
+ }
+ ))
+
+(defn thriftify-error [error]
+ (doto (ErrorInfo. (:error error) (:time-secs error))
+ (.set_host (:host error))
+ (.set_port (:port error))))
+
+(defn thriftify-credentials [credentials]
+ (doto (Credentials.)
+ (.set_creds (if credentials credentials {}))))
+
+(defn clojurify-crdentials [^Credentials credentials]
+ (if credentials
+ (into {} (.get_creds credentials))
+ nil
+ ))
+
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index f091dfb..c33609d 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -51,7 +51,7 @@
;; component->executors is a map from spout/bolt id to number of executors for that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner])
+(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status])
(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])
@@ -65,9 +65,11 @@
(def LS-LOCAL-ASSIGNMENTS "local-assignments")
(def LS-APPROVED-WORKERS "approved-workers")
-
-
-(defrecord WorkerHeartbeat [time-secs storm-id executors port])
+(defn mk-local-worker-heartbeat [time-secs storm-id executors port]
+ {:time-secs time-secs
+ :storm-id storm-id
+ :executors executors
+ :port port})
(defrecord ExecutorStats [^long processed
^long acked
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index a6e606d..a1ecc4a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -16,7 +16,9 @@
(ns backtype.storm.daemon.executor
(:use [backtype.storm.daemon common])
(:use [backtype.storm bootstrap])
- (:import [backtype.storm ICredentialsListener])
+ (:import [backtype.storm ICredentialsListener]
+ [backtype.storm.generated Grouping]
+ [java.io Serializable])
(:import [backtype.storm.hooks ITaskHook])
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.spout ISpoutWaitStrategy])
@@ -86,7 +88,7 @@
(let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
(mk-custom-grouper grouping context component-id stream-id target-tasks))
:custom-serialized
- (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]
+ (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping) Serializable)]
(mk-custom-grouper grouping context component-id stream-id target-tasks))
:direct
:direct
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index b2cb96a..77f59ad 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -15,7 +15,8 @@
;; limitations under the License.
(ns backtype.storm.daemon.nimbus
(:import [java.nio ByteBuffer]
- [java.util Collections])
+ [java.util Collections]
+ [backtype.storm.generated StormTopology])
(:import [java.io FileNotFoundException])
(:import [java.nio.channels Channels WritableByteChannel])
(:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
@@ -99,15 +100,7 @@
(Utils/deserialize
(FileUtils/readFileToByteArray
(File. (master-stormconf-path stormroot))
- )))))
-
-(defn set-topology-status! [nimbus storm-id status]
- (let [storm-cluster-state (:storm-cluster-state nimbus)]
- (.update-storm! storm-cluster-state
- storm-id
- {:status status})
- (log-message "Updated " storm-id " with status " status)
- ))
+ ) java.util.Map))))
(declare delay-event)
(declare mk-assignments)
@@ -122,8 +115,9 @@
storm-id
delay
:remove)
- {:type :killed
- :kill-time-secs delay})
+ {
+ :status {:type :killed}
+ :topology-action-options {:delay-secs delay :action :kill}})
))
(defn rebalance-transition [nimbus storm-id status]
@@ -136,24 +130,24 @@
storm-id
delay
:do-rebalance)
- {:type :rebalancing
- :delay-secs delay
- :old-status status
- :num-workers num-workers
- :executor-overrides executor-overrides
+ {:status {:type :rebalancing}
+ :prev-status status
+ :topology-action-options (-> {:delay-secs delay :action :rebalance}
+ (assoc-non-nil :num-workers num-workers)
+ (assoc-non-nil :component->executors executor-overrides))
})))
-(defn do-rebalance [nimbus storm-id status]
- (.update-storm! (:storm-cluster-state nimbus)
- storm-id
- (assoc-non-nil
- {:component->executors (:executor-overrides status)}
- :num-workers
- (:num-workers status)))
+(defn do-rebalance [nimbus storm-id status storm-base]
+ (let [rebalance-options (:topology-action-options storm-base)]
+ (.update-storm! (:storm-cluster-state nimbus)
+ storm-id
+ (-> {:topology-action-options nil}
+ (assoc-non-nil :component->executors (:component->executors rebalance-options))
+ (assoc-non-nil :num-workers (:num-workers rebalance-options)))))
(mk-assignments nimbus :scratch-topology-id storm-id))
-(defn state-transitions [nimbus storm-id status]
- {:active {:inactivate :inactive
+(defn state-transitions [nimbus storm-id status storm-base]
+ {:active {:inactivate :inactive
:activate nil
:rebalance (rebalance-transition nimbus storm-id status)
:kill (kill-transition nimbus storm-id)
@@ -165,7 +159,7 @@
}
:killed {:startup (fn [] (delay-event nimbus
storm-id
- (:kill-time-secs status)
+ (:delay-secs storm-base)
:remove)
nil)
:kill (kill-transition nimbus storm-id)
@@ -177,18 +171,15 @@
}
:rebalancing {:startup (fn [] (delay-event nimbus
storm-id
- (:delay-secs status)
+ (:delay-secs storm-base)
:do-rebalance)
nil)
:kill (kill-transition nimbus storm-id)
:do-rebalance (fn []
- (do-rebalance nimbus storm-id status)
- (:old-status status))
+ (do-rebalance nimbus storm-id status storm-base)
+ (:type (:prev-status storm-base)))
}})
-(defn topology-status [nimbus storm-id]
- (-> nimbus :storm-cluster-state (.storm-base storm-id nil) :status))
-
(defn transition!
([nimbus storm-id event]
(transition! nimbus storm-id event false))
@@ -196,7 +187,8 @@
(locking (:submit-lock nimbus)
(let [system-events #{:startup}
[event & event-args] (if (keyword? event) [event] event)
- status (topology-status nimbus storm-id)]
+ storm-base (-> nimbus :storm-cluster-state (.storm-base storm-id nil))
+ status (:status storm-base)]
;; handles the case where event was scheduled but topology has been removed
(if-not status
(log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
@@ -212,19 +204,20 @@
(log-message msg))
nil))
)))
- transition (-> (state-transitions nimbus storm-id status)
+ transition (-> (state-transitions nimbus storm-id status storm-base)
(get (:type status))
(get-event event))
transition (if (or (nil? transition)
(keyword? transition))
(fn [] transition)
transition)
- new-status (apply transition event-args)
- new-status (if (keyword? new-status)
- {:type new-status}
- new-status)]
- (when new-status
- (set-topology-status! nimbus storm-id new-status)))))
+ storm-base-updates (apply transition event-args)
+ storm-base-updates (if (keyword? storm-base-updates) ;if it's just a symbol, that just indicates new status.
+ {:status {:type storm-base-updates}}
+ storm-base-updates)]
+
+ (when storm-base-updates
+ (.update-storm! (:storm-cluster-state nimbus) storm-id storm-base-updates)))))
)))
(defn transition-name! [nimbus storm-name event & args]
@@ -296,7 +289,7 @@
[nimbus topologies missing-assignment-topologies]
(let [storm-cluster-state (:storm-cluster-state nimbus)
^INimbus inimbus (:inimbus nimbus)
-
+
supervisor-infos (all-supervisor-info storm-cluster-state nil)
supervisor-details (dofor [[id info] supervisor-infos]
@@ -327,7 +320,7 @@
(Utils/deserialize
(FileUtils/readFileToByteArray
(File. (master-stormcode-path stormroot))
- ))))
+ ) StormTopology)))
(declare compute-executor->component)
@@ -468,7 +461,7 @@
all-executors
(set (alive-executors nimbus topology-details all-executors assignment)))]]
{tid alive-executors})))
-
+
(defn- compute-supervisor->dead-ports [nimbus existing-assignments topology->executors topology->alive-executors]
(let [dead-slots (into [] (for [[tid assignment] existing-assignments
:let [all-executors (topology->executors tid)
@@ -514,7 +507,7 @@
((fn [ports] (map int ports))))
supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
{sid supervisor-details}))]
- (merge all-supervisor-details
+ (merge all-supervisor-details
(into {}
(for [[sid ports] nonexistent-supervisor-slots]
[sid (SupervisorDetails. sid nil ports)]))
@@ -576,7 +569,7 @@
topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
existing-assignments
topology->alive-executors)
-
+
missing-assignment-topologies (->> topologies
.getTopologies
(map (memfn getId))
@@ -594,7 +587,7 @@
all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
(map (fn [[node-id port]] {node-id #{port}}))
(apply merge-with set/union))
-
+
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
@@ -659,7 +652,7 @@
(defnk mk-assignments [nimbus :scratch-topology-id nil]
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
- ^INimbus inimbus (:inimbus nimbus)
+ ^INimbus inimbus (:inimbus nimbus)
;; read all the topologies
topology-ids (.active-storms storm-cluster-state)
topologies (into {} (for [tid topology-ids]
@@ -679,13 +672,13 @@
existing-assignments
topologies
scratch-topology-id)
-
+
topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
-
+
now-secs (current-time-secs)
-
+
basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
-
+
;; construct the final Assignments by adding start-times etc into it
new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
:let [existing-assignment (get existing-assignments topology-id)
@@ -723,14 +716,14 @@
(->> new-assignments
(map (fn [[topology-id assignment]]
(let [existing-assignment (get existing-assignments topology-id)]
- [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
+ [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
)))
(into {})
(.assignSlots inimbus topologies))
))
(defn- start-storm [nimbus storm-name storm-id topology-initial-status]
- {:pre [(#{:active :inactive} topology-initial-status)]}
+ {:pre [(#{:active :inactive} topology-initial-status)]}
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
storm-conf (read-storm-conf conf storm-id)
@@ -744,7 +737,9 @@
{:type topology-initial-status}
(storm-conf TOPOLOGY-WORKERS)
num-executors
- (storm-conf TOPOLOGY-SUBMITTER-USER)))))
+ (storm-conf TOPOLOGY-SUBMITTER-USER)
+ nil
+ nil))))
;; Master:
;; job submit:
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e717ce4..b13e8a7 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -61,7 +61,7 @@
(defn do-heartbeat [worker]
(let [conf (:conf worker)
- hb (WorkerHeartbeat.
+ hb (mk-local-worker-heartbeat
(current-time-secs)
(:storm-id worker)
(:executors worker)
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index b872c6f..db8930c 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -19,7 +19,7 @@
NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
ClusterSummary TopologyInfo TopologySummary ExecutorSummary ExecutorStats ExecutorSpecificStats
SpoutStats BoltStats ErrorInfo SupervisorSummary])
- (:use [backtype.storm util])
+ (:use [backtype.storm util log])
(:use [clojure.math.numeric-tower :only [ceil]]))
;;TODO: consider replacing this with some sort of RRD
@@ -301,42 +301,76 @@
(value-bolt-stats! stats))
(defmulti thriftify-specific-stats :type)
+(defmulti clojurify-specific-stats class-selector)
(defn window-set-converter
- ([stats key-fn]
- ;; make the first key a string,
- (into {}
- (for [[k v] stats]
- [(str k)
- (into {} (for [[k2 v2] v]
- [(key-fn k2) v2]))])))
- ([stats]
- (window-set-converter stats identity)))
+ ([stats key-fn first-key-fun]
+ (into {}
+ (for [[k v] stats]
+ ;apply the first-key-fun only to first key.
+ [(first-key-fun k)
+ (into {} (for [[k2 v2] v]
+ [(key-fn k2) v2]))])))
+ ([stats first-key-fun]
+ (window-set-converter stats identity first-key-fun)))
(defn to-global-stream-id
[[component stream]]
(GlobalStreamId. component stream))
+(defn from-global-stream-id [global-stream-id]
+ [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])
+
+(defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
+ [(window-set-converter (.get_acked stats) from-global-stream-id symbol)
+ (window-set-converter (.get_failed stats) from-global-stream-id symbol)
+ (window-set-converter (.get_process_ms_avg stats) from-global-stream-id symbol)
+ (window-set-converter (.get_executed stats) from-global-stream-id symbol)
+ (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id symbol)])
+
+(defmethod clojurify-specific-stats SpoutStats [^SpoutStats stats]
+ [(window-set-converter (.get_acked stats) symbol)
+ (window-set-converter (.get_failed stats) symbol)
+ (window-set-converter (.get_complete_ms_avg stats) symbol)])
+
+
+(defn clojurify-executor-stats
+ [^ExecutorStats stats]
+ (let [ specific-stats (.get_specific stats)
+ is_bolt? (.is_set_bolt specific-stats)
+ specific-stats (if is_bolt? (.get_bolt specific-stats) (.get_spout specific-stats))
+ specific-stats (clojurify-specific-stats specific-stats)
+ common-stats (CommonStats. (window-set-converter (.get_emitted stats) symbol) (window-set-converter (.get_transferred stats) symbol) (.get_rate stats))]
+ (if is_bolt?
+ ; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats!
+ ; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting values from all atoms and merging all values inside :common to top
+ ;level map we are pretty much doing the same here.
+ (dissoc (merge common-stats {:type :bolt} (apply ->BoltExecutorStats (into [nil] specific-stats))) :common)
+ (dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common)
+ )))
+
(defmethod thriftify-specific-stats :bolt
[stats]
(ExecutorSpecificStats/bolt
(BoltStats.
- (window-set-converter (:acked stats) to-global-stream-id)
- (window-set-converter (:failed stats) to-global-stream-id)
- (window-set-converter (:process-latencies stats) to-global-stream-id)
- (window-set-converter (:executed stats) to-global-stream-id)
- (window-set-converter (:execute-latencies stats) to-global-stream-id))))
+ (window-set-converter (:acked stats) to-global-stream-id str)
+ (window-set-converter (:failed stats) to-global-stream-id str)
+ (window-set-converter (:process-latencies stats) to-global-stream-id str)
+ (window-set-converter (:executed stats) to-global-stream-id str)
+ (window-set-converter (:execute-latencies stats) to-global-stream-id str))))
(defmethod thriftify-specific-stats :spout
[stats]
(ExecutorSpecificStats/spout
- (SpoutStats. (window-set-converter (:acked stats))
- (window-set-converter (:failed stats))
- (window-set-converter (:complete-latencies stats)))))
+ (SpoutStats. (window-set-converter (:acked stats) str)
+ (window-set-converter (:failed stats) str)
+ (window-set-converter (:complete-latencies stats) str))))
(defn thriftify-executor-stats
[stats]
- (let [specific-stats (thriftify-specific-stats stats)]
- (ExecutorStats. (window-set-converter (:emitted stats))
- (window-set-converter (:transferred stats))
- specific-stats)))
+ (let [specific-stats (thriftify-specific-stats stats)
+ rate (:rate stats)]
+ (ExecutorStats. (window-set-converter (:emitted stats) str)
+ (window-set-converter (:transferred stats) str)
+ specific-stats
+ rate)))
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj
index ce0a5ff..5bc1150 100644
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@ -15,12 +15,14 @@
;; limitations under the License.
(ns backtype.storm.thrift
- (:import [java.util HashMap])
+ (:import [java.util HashMap]
+ [java.io Serializable]
+ [backtype.storm.generated NodeInfo Assignment])
(:import [backtype.storm.generated JavaObject Grouping Nimbus StormTopology
StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
GlobalStreamId ComponentObject ComponentObject$_Fields
- ShellComponent])
+ ShellComponent SupervisorInfo])
(:import [backtype.storm.utils Utils NimbusClient])
(:import [backtype.storm Constants])
(:import [backtype.storm.grouping CustomStreamGrouping])
@@ -155,7 +157,7 @@
[^ComponentObject obj]
(when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
(throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
- (Utils/deserialize (.get_serialized_java obj)))
+ (Utils/deserialize (.get_serialized_java obj) Serializable))
(defn serialize-component-object
[obj]
@@ -271,3 +273,4 @@
(def SPOUT-FIELDS
[StormTopology$_Fields/SPOUTS
StormTopology$_Fields/STATE_SPOUTS])
+
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
index b9b97e9..3d3208b 100644
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
@@ -103,6 +103,6 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
}
private IBatchBolt newTransactionalBolt() {
- return (IBatchBolt) Utils.deserialize(_boltSer);
+ return Utils.deserialize(_boltSer, IBatchBolt.class);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/jvm/backtype/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Assignment.java b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
new file mode 100644
index 0000000..b6ade94
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Assignment implements org.apache.thrift.TBase<Assignment, Assignment._Fields>, java.io.Serializable, Cloneable {
+ private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Assignment");
+
+ private static final org.apache.thrift.protocol.TField MASTER_CODE_DIR_FIELD_DESC = new org.apache.thrift.protocol.TField("master_code_dir", org.apache.thrift.protocol.TType.STRING, (short)1);
+ private static final org.apache.thrift.protocol.TField NODE_HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("node_host", org.apache.thrift.protocol.TType.MAP, (short)2);
+ private static final org.apache.thrift.protocol.TField EXECUTOR_NODE_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_node_port", org.apache.thrift.protocol.TType.MAP, (short)3);
+ private static final org.apache.thrift.protocol.TField EXECUTOR_START_TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_start_time_secs", org.apache.thrift.protocol.TType.MAP, (short)4);
+
+ private String master_code_dir; // required
+ private Map<String,String> node_host; // required
+ private Map<List<Long>,NodeInfo> executor_node_port; // required
+ private Map<List<Long>,Long> executor_start_time_secs; // required
+
+ /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+ public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+ MASTER_CODE_DIR((short)1, "master_code_dir"),
+ NODE_HOST((short)2, "node_host"),
+ EXECUTOR_NODE_PORT((short)3, "executor_node_port"),
+ EXECUTOR_START_TIME_SECS((short)4, "executor_start_time_secs");
+
+ private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+ static {
+ for (_Fields field : EnumSet.allOf(_Fields.class)) {
+ byName.put(field.getFieldName(), field);
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, or null if its not found.
+ */
+ public static _Fields findByThriftId(int fieldId) {
+ switch(fieldId) {
+ case 1: // MASTER_CODE_DIR
+ return MASTER_CODE_DIR;
+ case 2: // NODE_HOST
+ return NODE_HOST;
+ case 3: // EXECUTOR_NODE_PORT
+ return EXECUTOR_NODE_PORT;
+ case 4: // EXECUTOR_START_TIME_SECS
+ return EXECUTOR_START_TIME_SECS;
+ default:
+ return null;
+ }
+ }
+
+ /**
+ * Find the _Fields constant that matches fieldId, throwing an exception
+ * if it is not found.
+ */
+ public static _Fields findByThriftIdOrThrow(int fieldId) {
+ _Fields fields = findByThriftId(fieldId);
+ if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+ return fields;
+ }
+
+ /**
+ * Find the _Fields constant that matches name, or null if its not found.
+ */
+ public static _Fields findByName(String name) {
+ return byName.get(name);
+ }
+
+ private final short _thriftId;
+ private final String _fieldName;
+
+ _Fields(short thriftId, String fieldName) {
+ _thriftId = thriftId;
+ _fieldName = fieldName;
+ }
+
+ public short getThriftFieldId() {
+ return _thriftId;
+ }
+
+ public String getFieldName() {
+ return _fieldName;
+ }
+ }
+
+ // isset id assignments
+
+ public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+ static {
+ Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+ tmpMap.put(_Fields.MASTER_CODE_DIR, new org.apache.thrift.meta_data.FieldMetaData("master_code_dir", org.apache.thrift.TFieldRequirementType.REQUIRED,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+ tmpMap.put(_Fields.NODE_HOST, new org.apache.thrift.meta_data.FieldMetaData("node_host", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+ tmpMap.put(_Fields.EXECUTOR_NODE_PORT, new org.apache.thrift.meta_data.FieldMetaData("executor_node_port", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)),
+ new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NodeInfo.class))));
+ tmpMap.put(_Fields.EXECUTOR_START_TIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("executor_start_time_secs", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+ new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+ new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)),
+ new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))));
+ metaDataMap = Collections.unmodifiableMap(tmpMap);
+ org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Assignment.class, metaDataMap);
+ }
+
+ public Assignment() {
+ this.node_host = new HashMap<String,String>();
+
+ this.executor_node_port = new HashMap<List<Long>,NodeInfo>();
+
+ this.executor_start_time_secs = new HashMap<List<Long>,Long>();
+
+ }
+
+ public Assignment(
+ String master_code_dir)
+ {
+ this();
+ this.master_code_dir = master_code_dir;
+ }
+
+ /**
+ * Performs a deep copy on <i>other</i>.
+ */
+ public Assignment(Assignment other) {
+ if (other.is_set_master_code_dir()) {
+ this.master_code_dir = other.master_code_dir;
+ }
+ if (other.is_set_node_host()) {
+ Map<String,String> __this__node_host = new HashMap<String,String>();
+ for (Map.Entry<String, String> other_element : other.node_host.entrySet()) {
+
+ String other_element_key = other_element.getKey();
+ String other_element_value = other_element.getValue();
+
+ String __this__node_host_copy_key = other_element_key;
+
+ String __this__node_host_copy_value = other_element_value;
+
+ __this__node_host.put(__this__node_host_copy_key, __this__node_host_copy_value);
+ }
+ this.node_host = __this__node_host;
+ }
+ if (other.is_set_executor_node_port()) {
+ Map<List<Long>,NodeInfo> __this__executor_node_port = new HashMap<List<Long>,NodeInfo>();
+ for (Map.Entry<List<Long>, NodeInfo> other_element : other.executor_node_port.entrySet()) {
+
+ List<Long> other_element_key = other_element.getKey();
+ NodeInfo other_element_value = other_element.getValue();
+
+ List<Long> __this__executor_node_port_copy_key = new ArrayList<Long>();
+ for (Long other_element_key_element : other_element_key) {
+ __this__executor_node_port_copy_key.add(other_element_key_element);
+ }
+
+ NodeInfo __this__executor_node_port_copy_value = new NodeInfo(other_element_value);
+
+ __this__executor_node_port.put(__this__executor_node_port_copy_key, __this__executor_node_port_copy_value);
+ }
+ this.executor_node_port = __this__executor_node_port;
+ }
+ if (other.is_set_executor_start_time_secs()) {
+ Map<List<Long>,Long> __this__executor_start_time_secs = new HashMap<List<Long>,Long>();
+ for (Map.Entry<List<Long>, Long> other_element : other.executor_start_time_secs.entrySet()) {
+
+ List<Long> other_element_key = other_element.getKey();
+ Long other_element_value = other_element.getValue();
+
+ List<Long> __this__executor_start_time_secs_copy_key = new ArrayList<Long>();
+ for (Long other_element_key_element : other_element_key) {
+ __this__executor_start_time_secs_copy_key.add(other_element_key_element);
+ }
+
+ Long __this__executor_start_time_secs_copy_value = other_element_value;
+
+ __this__executor_start_time_secs.put(__this__executor_start_time_secs_copy_key, __this__executor_start_time_secs_copy_value);
+ }
+ this.executor_start_time_secs = __this__executor_start_time_secs;
+ }
+ }
+
+ public Assignment deepCopy() {
+ return new Assignment(this);
+ }
+
+ @Override
+ public void clear() {
+ this.master_code_dir = null;
+ this.node_host = new HashMap<String,String>();
+
+ this.executor_node_port = new HashMap<List<Long>,NodeInfo>();
+
+ this.executor_start_time_secs = new HashMap<List<Long>,Long>();
+
+ }
+
+ public String get_master_code_dir() {
+ return this.master_code_dir;
+ }
+
+ public void set_master_code_dir(String master_code_dir) {
+ this.master_code_dir = master_code_dir;
+ }
+
+ public void unset_master_code_dir() {
+ this.master_code_dir = null;
+ }
+
+ /** Returns true if field master_code_dir is set (has been assigned a value) and false otherwise */
+ public boolean is_set_master_code_dir() {
+ return this.master_code_dir != null;
+ }
+
+ public void set_master_code_dir_isSet(boolean value) {
+ if (!value) {
+ this.master_code_dir = null;
+ }
+ }
+
+ public int get_node_host_size() {
+ return (this.node_host == null) ? 0 : this.node_host.size();
+ }
+
+ public void put_to_node_host(String key, String val) {
+ if (this.node_host == null) {
+ this.node_host = new HashMap<String,String>();
+ }
+ this.node_host.put(key, val);
+ }
+
+ public Map<String,String> get_node_host() {
+ return this.node_host;
+ }
+
+ public void set_node_host(Map<String,String> node_host) {
+ this.node_host = node_host;
+ }
+
+ public void unset_node_host() {
+ this.node_host = null;
+ }
+
+ /** Returns true if field node_host is set (has been assigned a value) and false otherwise */
+ public boolean is_set_node_host() {
+ return this.node_host != null;
+ }
+
+ public void set_node_host_isSet(boolean value) {
+ if (!value) {
+ this.node_host = null;
+ }
+ }
+
+ public int get_executor_node_port_size() {
+ return (this.executor_node_port == null) ? 0 : this.executor_node_port.size();
+ }
+
+ public void put_to_executor_node_port(List<Long> key, NodeInfo val) {
+ if (this.executor_node_port == null) {
+ this.executor_node_port = new HashMap<List<Long>,NodeInfo>();
+ }
+ this.executor_node_port.put(key, val);
+ }
+
+ public Map<List<Long>,NodeInfo> get_executor_node_port() {
+ return this.executor_node_port;
+ }
+
+ public void set_executor_node_port(Map<List<Long>,NodeInfo> executor_node_port) {
+ this.executor_node_port = executor_node_port;
+ }
+
+ public void unset_executor_node_port() {
+ this.executor_node_port = null;
+ }
+
+ /** Returns true if field executor_node_port is set (has been assigned a value) and false otherwise */
+ public boolean is_set_executor_node_port() {
+ return this.executor_node_port != null;
+ }
+
+ public void set_executor_node_port_isSet(boolean value) {
+ if (!value) {
+ this.executor_node_port = null;
+ }
+ }
+
+ public int get_executor_start_time_secs_size() {
+ return (this.executor_start_time_secs == null) ? 0 : this.executor_start_time_secs.size();
+ }
+
+ public void put_to_executor_start_time_secs(List<Long> key, long val) {
+ if (this.executor_start_time_secs == null) {
+ this.executor_start_time_secs = new HashMap<List<Long>,Long>();
+ }
+ this.executor_start_time_secs.put(key, val);
+ }
+
+ public Map<List<Long>,Long> get_executor_start_time_secs() {
+ return this.executor_start_time_secs;
+ }
+
+ public void set_executor_start_time_secs(Map<List<Long>,Long> executor_start_time_secs) {
+ this.executor_start_time_secs = executor_start_time_secs;
+ }
+
+ public void unset_executor_start_time_secs() {
+ this.executor_start_time_secs = null;
+ }
+
+ /** Returns true if field executor_start_time_secs is set (has been assigned a value) and false otherwise */
+ public boolean is_set_executor_start_time_secs() {
+ return this.executor_start_time_secs != null;
+ }
+
+ public void set_executor_start_time_secs_isSet(boolean value) {
+ if (!value) {
+ this.executor_start_time_secs = null;
+ }
+ }
+
+ public void setFieldValue(_Fields field, Object value) {
+ switch (field) {
+ case MASTER_CODE_DIR:
+ if (value == null) {
+ unset_master_code_dir();
+ } else {
+ set_master_code_dir((String)value);
+ }
+ break;
+
+ case NODE_HOST:
+ if (value == null) {
+ unset_node_host();
+ } else {
+ set_node_host((Map<String,String>)value);
+ }
+ break;
+
+ case EXECUTOR_NODE_PORT:
+ if (value == null) {
+ unset_executor_node_port();
+ } else {
+ set_executor_node_port((Map<List<Long>,NodeInfo>)value);
+ }
+ break;
+
+ case EXECUTOR_START_TIME_SECS:
+ if (value == null) {
+ unset_executor_start_time_secs();
+ } else {
+ set_executor_start_time_secs((Map<List<Long>,Long>)value);
+ }
+ break;
+
+ }
+ }
+
+ public Object getFieldValue(_Fields field) {
+ switch (field) {
+ case MASTER_CODE_DIR:
+ return get_master_code_dir();
+
+ case NODE_HOST:
+ return get_node_host();
+
+ case EXECUTOR_NODE_PORT:
+ return get_executor_node_port();
+
+ case EXECUTOR_START_TIME_SECS:
+ return get_executor_start_time_secs();
+
+ }
+ throw new IllegalStateException();
+ }
+
+ /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+ public boolean isSet(_Fields field) {
+ if (field == null) {
+ throw new IllegalArgumentException();
+ }
+
+ switch (field) {
+ case MASTER_CODE_DIR:
+ return is_set_master_code_dir();
+ case NODE_HOST:
+ return is_set_node_host();
+ case EXECUTOR_NODE_PORT:
+ return is_set_executor_node_port();
+ case EXECUTOR_START_TIME_SECS:
+ return is_set_executor_start_time_secs();
+ }
+ throw new IllegalStateException();
+ }
+
+ @Override
+ public boolean equals(Object that) {
+ if (that == null)
+ return false;
+ if (that instanceof Assignment)
+ return this.equals((Assignment)that);
+ return false;
+ }
+
+ public boolean equals(Assignment that) {
+ if (that == null)
+ return false;
+
+ boolean this_present_master_code_dir = true && this.is_set_master_code_dir();
+ boolean that_present_master_code_dir = true && that.is_set_master_code_dir();
+ if (this_present_master_code_dir || that_present_master_code_dir) {
+ if (!(this_present_master_code_dir && that_present_master_code_dir))
+ return false;
+ if (!this.master_code_dir.equals(that.master_code_dir))
+ return false;
+ }
+
+ boolean this_present_node_host = true && this.is_set_node_host();
+ boolean that_present_node_host = true && that.is_set_node_host();
+ if (this_present_node_host || that_present_node_host) {
+ if (!(this_present_node_host && that_present_node_host))
+ return false;
+ if (!this.node_host.equals(that.node_host))
+ return false;
+ }
+
+ boolean this_present_executor_node_port = true && this.is_set_executor_node_port();
+ boolean that_present_executor_node_port = true && that.is_set_executor_node_port();
+ if (this_present_executor_node_port || that_present_executor_node_port) {
+ if (!(this_present_executor_node_port && that_present_executor_node_port))
+ return false;
+ if (!this.executor_node_port.equals(that.executor_node_port))
+ return false;
+ }
+
+ boolean this_present_executor_start_time_secs = true && this.is_set_executor_start_time_secs();
+ boolean that_present_executor_start_time_secs = true && that.is_set_executor_start_time_secs();
+ if (this_present_executor_start_time_secs || that_present_executor_start_time_secs) {
+ if (!(this_present_executor_start_time_secs && that_present_executor_start_time_secs))
+ return false;
+ if (!this.executor_start_time_secs.equals(that.executor_start_time_secs))
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ HashCodeBuilder builder = new HashCodeBuilder();
+
+ boolean present_master_code_dir = true && (is_set_master_code_dir());
+ builder.append(present_master_code_dir);
+ if (present_master_code_dir)
+ builder.append(master_code_dir);
+
+ boolean present_node_host = true && (is_set_node_host());
+ builder.append(present_node_host);
+ if (present_node_host)
+ builder.append(node_host);
+
+ boolean present_executor_node_port = true && (is_set_executor_node_port());
+ builder.append(present_executor_node_port);
+ if (present_executor_node_port)
+ builder.append(executor_node_port);
+
+ boolean present_executor_start_time_secs = true && (is_set_executor_start_time_secs());
+ builder.append(present_executor_start_time_secs);
+ if (present_executor_start_time_secs)
+ builder.append(executor_start_time_secs);
+
+ return builder.toHashCode();
+ }
+
+ public int compareTo(Assignment other) {
+ if (!getClass().equals(other.getClass())) {
+ return getClass().getName().compareTo(other.getClass().getName());
+ }
+
+ int lastComparison = 0;
+ Assignment typedOther = (Assignment)other;
+
+ lastComparison = Boolean.valueOf(is_set_master_code_dir()).compareTo(typedOther.is_set_master_code_dir());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_master_code_dir()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.master_code_dir, typedOther.master_code_dir);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_node_host()).compareTo(typedOther.is_set_node_host());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_node_host()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.node_host, typedOther.node_host);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_executor_node_port()).compareTo(typedOther.is_set_executor_node_port());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_executor_node_port()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executor_node_port, typedOther.executor_node_port);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ lastComparison = Boolean.valueOf(is_set_executor_start_time_secs()).compareTo(typedOther.is_set_executor_start_time_secs());
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ if (is_set_executor_start_time_secs()) {
+ lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executor_start_time_secs, typedOther.executor_start_time_secs);
+ if (lastComparison != 0) {
+ return lastComparison;
+ }
+ }
+ return 0;
+ }
+
+ public _Fields fieldForId(int fieldId) {
+ return _Fields.findByThriftId(fieldId);
+ }
+
+ public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+ org.apache.thrift.protocol.TField field;
+ iprot.readStructBegin();
+ while (true)
+ {
+ field = iprot.readFieldBegin();
+ if (field.type == org.apache.thrift.protocol.TType.STOP) {
+ break;
+ }
+ switch (field.id) {
+ case 1: // MASTER_CODE_DIR
+ if (field.type == org.apache.thrift.protocol.TType.STRING) {
+ this.master_code_dir = iprot.readString();
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 2: // NODE_HOST
+ if (field.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map185 = iprot.readMapBegin();
+ this.node_host = new HashMap<String,String>(2*_map185.size);
+ for (int _i186 = 0; _i186 < _map185.size; ++_i186)
+ {
+ String _key187; // required
+ String _val188; // required
+ _key187 = iprot.readString();
+ _val188 = iprot.readString();
+ this.node_host.put(_key187, _val188);
+ }
+ iprot.readMapEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 3: // EXECUTOR_NODE_PORT
+ if (field.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map189 = iprot.readMapBegin();
+ this.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map189.size);
+ for (int _i190 = 0; _i190 < _map189.size; ++_i190)
+ {
+ List<Long> _key191; // required
+ NodeInfo _val192; // required
+ {
+ org.apache.thrift.protocol.TList _list193 = iprot.readListBegin();
+ _key191 = new ArrayList<Long>(_list193.size);
+ for (int _i194 = 0; _i194 < _list193.size; ++_i194)
+ {
+ long _elem195; // required
+ _elem195 = iprot.readI64();
+ _key191.add(_elem195);
+ }
+ iprot.readListEnd();
+ }
+ _val192 = new NodeInfo();
+ _val192.read(iprot);
+ this.executor_node_port.put(_key191, _val192);
+ }
+ iprot.readMapEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ case 4: // EXECUTOR_START_TIME_SECS
+ if (field.type == org.apache.thrift.protocol.TType.MAP) {
+ {
+ org.apache.thrift.protocol.TMap _map196 = iprot.readMapBegin();
+ this.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map196.size);
+ for (int _i197 = 0; _i197 < _map196.size; ++_i197)
+ {
+ List<Long> _key198; // required
+ long _val199; // required
+ {
+ org.apache.thrift.protocol.TList _list200 = iprot.readListBegin();
+ _key198 = new ArrayList<Long>(_list200.size);
+ for (int _i201 = 0; _i201 < _list200.size; ++_i201)
+ {
+ long _elem202; // required
+ _elem202 = iprot.readI64();
+ _key198.add(_elem202);
+ }
+ iprot.readListEnd();
+ }
+ _val199 = iprot.readI64();
+ this.executor_start_time_secs.put(_key198, _val199);
+ }
+ iprot.readMapEnd();
+ }
+ } else {
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ break;
+ default:
+ org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+ }
+ iprot.readFieldEnd();
+ }
+ iprot.readStructEnd();
+ validate();
+ }
+
+ public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+ validate();
+
+ oprot.writeStructBegin(STRUCT_DESC);
+ if (this.master_code_dir != null) {
+ oprot.writeFieldBegin(MASTER_CODE_DIR_FIELD_DESC);
+ oprot.writeString(this.master_code_dir);
+ oprot.writeFieldEnd();
+ }
+ if (this.node_host != null) {
+ if (is_set_node_host()) {
+ oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.node_host.size()));
+ for (Map.Entry<String, String> _iter203 : this.node_host.entrySet())
+ {
+ oprot.writeString(_iter203.getKey());
+ oprot.writeString(_iter203.getValue());
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ }
+ if (this.executor_node_port != null) {
+ if (is_set_executor_node_port()) {
+ oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, this.executor_node_port.size()));
+ for (Map.Entry<List<Long>, NodeInfo> _iter204 : this.executor_node_port.entrySet())
+ {
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter204.getKey().size()));
+ for (long _iter205 : _iter204.getKey())
+ {
+ oprot.writeI64(_iter205);
+ }
+ oprot.writeListEnd();
+ }
+ _iter204.getValue().write(oprot);
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ }
+ if (this.executor_start_time_secs != null) {
+ if (is_set_executor_start_time_secs()) {
+ oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
+ {
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, this.executor_start_time_secs.size()));
+ for (Map.Entry<List<Long>, Long> _iter206 : this.executor_start_time_secs.entrySet())
+ {
+ {
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter206.getKey().size()));
+ for (long _iter207 : _iter206.getKey())
+ {
+ oprot.writeI64(_iter207);
+ }
+ oprot.writeListEnd();
+ }
+ oprot.writeI64(_iter206.getValue());
+ }
+ oprot.writeMapEnd();
+ }
+ oprot.writeFieldEnd();
+ }
+ }
+ oprot.writeFieldStop();
+ oprot.writeStructEnd();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder("Assignment(");
+ boolean first = true;
+
+ sb.append("master_code_dir:");
+ if (this.master_code_dir == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.master_code_dir);
+ }
+ first = false;
+ if (is_set_node_host()) {
+ if (!first) sb.append(", ");
+ sb.append("node_host:");
+ if (this.node_host == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.node_host);
+ }
+ first = false;
+ }
+ if (is_set_executor_node_port()) {
+ if (!first) sb.append(", ");
+ sb.append("executor_node_port:");
+ if (this.executor_node_port == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.executor_node_port);
+ }
+ first = false;
+ }
+ if (is_set_executor_start_time_secs()) {
+ if (!first) sb.append(", ");
+ sb.append("executor_start_time_secs:");
+ if (this.executor_start_time_secs == null) {
+ sb.append("null");
+ } else {
+ sb.append(this.executor_start_time_secs);
+ }
+ first = false;
+ }
+ sb.append(")");
+ return sb.toString();
+ }
+
+ public void validate() throws org.apache.thrift.TException {
+ // check for required fields
+ if (!is_set_master_code_dir()) {
+ throw new org.apache.thrift.protocol.TProtocolException("Required field 'master_code_dir' is unset! Struct:" + toString());
+ }
+
+ }
+
+ private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+ try {
+ write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+ try {
+ read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+ } catch (org.apache.thrift.TException te) {
+ throw new java.io.IOException(te);
+ }
+ }
+
+}
+