You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/03/18 20:39:11 UTC

[04/16] storm git commit: STORM-634: Converting SupervisorInfo, Assignment, StormBase, TopologyStatus, ZKWorkerHeartbeat, ErrorInfo, Credentials to thrift and defaulting the serialization delegate to thrift serialization. Added class as a param to serializatio

STORM-634: Converting SupervisorInfo,Assignment,StormBase,TopologyStatus,ZKWorkerHeartbeat,ErrorInfo,Credentials to thrift and defaulting the serialization delegate to thrift serialization. Added class as a param to serialization delegate interface.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/63900643
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/63900643
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/63900643

Branch: refs/heads/master
Commit: 639006432c658226bd33dc2ae607f121f3dc02bb
Parents: a115c9d
Author: Parth Brahmbhatt <br...@gmail.com>
Authored: Thu Jan 29 13:46:01 2015 -0800
Committer: Parth Brahmbhatt <br...@gmail.com>
Committed: Tue Feb 3 11:52:15 2015 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |    2 +-
 storm-core/src/clj/backtype/storm/bootstrap.clj |    2 +-
 storm-core/src/clj/backtype/storm/cluster.clj   |   47 +-
 storm-core/src/clj/backtype/storm/config.clj    |    7 +-
 storm-core/src/clj/backtype/storm/converter.clj |  200 ++++
 .../src/clj/backtype/storm/daemon/common.clj    |   10 +-
 .../src/clj/backtype/storm/daemon/executor.clj  |    6 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  105 +-
 .../src/clj/backtype/storm/daemon/worker.clj    |    2 +-
 storm-core/src/clj/backtype/storm/stats.clj     |   78 +-
 storm-core/src/clj/backtype/storm/thrift.clj    |    9 +-
 .../storm/coordination/BatchBoltExecutor.java   |    2 +-
 .../backtype/storm/generated/Assignment.java    |  817 +++++++++++++
 .../backtype/storm/generated/ExecutorStats.java |  125 +-
 .../jvm/backtype/storm/generated/NodeInfo.java  |  479 ++++++++
 .../jvm/backtype/storm/generated/StormBase.java | 1078 ++++++++++++++++++
 .../storm/generated/SupervisorInfo.java         | 1030 +++++++++++++++++
 .../storm/generated/TopologyActionOptions.java  |  335 ++++++
 .../storm/generated/TopologyStatus.java         |   67 ++
 .../storm/generated/ZKWorkerHeartbeat.java      |  586 ++++++++++
 .../DefaultSerializationDelegate.java           |   10 +-
 .../GzipBridgeSerializationDelegate.java        |    6 +-
 .../GzipSerializationDelegate.java              |   10 +-
 .../serialization/SerializationDelegate.java    |    2 +-
 .../jvm/backtype/storm/serialization/Test.java  |   17 +
 .../ThriftSerializationDelegate.java            |   52 +
 .../ThriftSerializationDelegateBridge.java      |   51 +
 .../jvm/backtype/storm/utils/LocalState.java    |    2 +-
 .../src/jvm/backtype/storm/utils/Utils.java     |   56 +-
 .../src/jvm/storm/trident/TridentTopology.java  |    4 +-
 storm-core/src/py/storm/ttypes.py               |  796 ++++++++++++-
 storm-core/src/storm.thrift                     |   51 +
 .../test/clj/backtype/storm/cluster_test.clj    |   25 +-
 .../test/clj/backtype/storm/nimbus_test.clj     |    6 +-
 .../GzipBridgeSerializationDelegateTest.java    |    6 +-
 .../ThriftBridgeSerializationDelegateTest.java  |   79 ++
 36 files changed, 5957 insertions(+), 203 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 35d20ff..141e1d3 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -47,7 +47,7 @@ storm.auth.simple-white-list.users: []
 storm.auth.simple-acl.users: []
 storm.auth.simple-acl.users.commands: []
 storm.auth.simple-acl.admins: []
-storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializationDelegate"
+storm.meta.serialization.delegate: "backtype.storm.serialization.ThriftSerializationDelegateBridge"
 
 ### nimbus.* configs are for the master
 nimbus.host: "localhost"

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/bootstrap.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/bootstrap.clj b/storm-core/src/clj/backtype/storm/bootstrap.clj
index c1063cf..bc08e41 100644
--- a/storm-core/src/clj/backtype/storm/bootstrap.clj
+++ b/storm-core/src/clj/backtype/storm/bootstrap.clj
@@ -57,7 +57,7 @@
                      KillOptions SubmitOptions RebalanceOptions JavaObject JavaObjectArg
                      TopologyInitialStatus AuthorizationException]))
      (import (quote [backtype.storm.daemon.common StormBase Assignment
-                     SupervisorInfo WorkerHeartbeat]))
+                     SupervisorInfo]))
      (import (quote [backtype.storm.grouping CustomStreamGrouping]))
      (import (quote [java.io File FileOutputStream FileInputStream]))
      (import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList]))

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/cluster.clj b/storm-core/src/clj/backtype/storm/cluster.clj
index 8ead710..15bf8a3 100644
--- a/storm-core/src/clj/backtype/storm/cluster.clj
+++ b/storm-core/src/clj/backtype/storm/cluster.clj
@@ -15,12 +15,14 @@
 ;; limitations under the License.
 
 (ns backtype.storm.cluster
-  (:import [org.apache.zookeeper.data Stat ACL Id])
+  (:import [org.apache.zookeeper.data Stat ACL Id]
+           [backtype.storm.generated SupervisorInfo Assignment StormBase ZKWorkerHeartbeat ErrorInfo Credentials]
+           [java.io Serializable])
   (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
   (:import [backtype.storm.utils Utils])
   (:import [java.security MessageDigest])
   (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
-  (:use [backtype.storm util log config])
+  (:use [backtype.storm util log config converter])
   (:require [backtype.storm [zookeeper :as zk]])
   (:require [backtype.storm.daemon [common :as common]]))
 
@@ -228,9 +230,9 @@
       (cb id))))
 
 (defn- maybe-deserialize
-  [ser]
+  [ser clazz]
   (when ser
-    (Utils/deserialize ser)))
+    (Utils/deserialize ser clazz)))
 
 (defstruct TaskError :error :time-secs :host :port)
 
@@ -292,7 +294,7 @@
         [this storm-id callback]
         (when callback
           (swap! assignment-info-callback assoc storm-id callback))
-        (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback))))
+        (clojurify-assignment (maybe-deserialize (get-data cluster-state (assignment-path storm-id) (not-nil? callback)) Assignment)))
 
       (assignment-info-with-version 
         [this storm-id callback]
@@ -300,7 +302,7 @@
           (swap! assignment-info-with-version-callback assoc storm-id callback))
         (let [{data :data version :version} 
               (get-data-with-version cluster-state (assignment-path storm-id) (not-nil? callback))]
-        {:data (maybe-deserialize data)
+        {:data (clojurify-assignment (maybe-deserialize data Assignment))
          :version version}))
 
       (assignment-version 
@@ -325,7 +327,9 @@
         [this storm-id node port]
         (-> cluster-state
             (get-data (workerbeat-path storm-id node port) false)
-            maybe-deserialize))
+          (maybe-deserialize ZKWorkerHeartbeat)
+          clojurify-zk-worker-hb))
+
 
       (executor-beats
         [this storm-id executor->node+port]
@@ -348,11 +352,12 @@
 
       (supervisor-info
         [this supervisor-id]
-        (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false)))
+        (clojurify-supervisor-info (maybe-deserialize (get-data cluster-state (supervisor-path supervisor-id) false) SupervisorInfo)))
 
       (worker-heartbeat!
         [this storm-id node port info]
-        (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize info) acls))
+        (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
+          (set-data cluster-state (workerbeat-path storm-id node port) (Utils/serialize thrift-worker-hb) acls)))
 
       (remove-worker-heartbeat!
         [this storm-id node port]
@@ -378,11 +383,13 @@
 
       (supervisor-heartbeat!
         [this supervisor-id info]
-        (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info) acls))
+        (let [thrift-supervisor-info (thriftify-supervisor-info info)]
+          (set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls)))
 
       (activate-storm!
         [this storm-id storm-base]
-        (set-data cluster-state (storm-path storm-id) (Utils/serialize storm-base) acls))
+        (let [thrift-storm-base (thriftify-storm-base storm-base)]
+          (set-data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls)))
 
       (update-storm!
         [this storm-id new-elems]
@@ -392,6 +399,7 @@
           (set-data cluster-state (storm-path storm-id)
                     (-> base
                         (merge new-elems)
+                        thriftify-storm-base
                         Utils/serialize)
                     acls)))
 
@@ -399,7 +407,7 @@
         [this storm-id callback]
         (when callback
           (swap! storm-base-callback assoc storm-id callback))
-        (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback))))
+        (clojurify-storm-base (maybe-deserialize (get-data cluster-state (storm-path storm-id) (not-nil? callback)) StormBase)))
 
       (remove-storm-base!
         [this storm-id]
@@ -407,7 +415,8 @@
 
       (set-assignment!
         [this storm-id info]
-        (set-data cluster-state (assignment-path storm-id) (Utils/serialize info) acls))
+        (let [thrift-assignment (thriftify-assignment info)]
+          (set-data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls)))
 
       (remove-storm!
         [this storm-id]
@@ -418,19 +427,20 @@
       (set-credentials!
          [this storm-id creds topo-conf]
          (let [topo-acls (mk-topo-only-acls topo-conf)
-               path (credentials-path storm-id)]
-           (set-data cluster-state path (Utils/serialize creds) topo-acls)))
+               path (credentials-path storm-id)
+               thriftified-creds (thriftify-credentials creds)]
+           (set-data cluster-state path (Utils/serialize thriftified-creds) topo-acls)))
 
       (credentials
         [this storm-id callback]
         (when callback
           (swap! credentials-callback assoc storm-id callback))
-        (maybe-deserialize (get-data cluster-state (credentials-path storm-id) (not-nil? callback))))
+        (clojurify-crdentials (maybe-deserialize (get-data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials)))
 
       (report-error
          [this storm-id component-id node port error]
          (let [path (error-path storm-id component-id)
-               data {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}
+               data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port})
                _ (mkdirs cluster-state path acls)
                _ (create-sequential cluster-state (str path "/e") (Utils/serialize data) acls)
                to-kill (->> (get-children cluster-state path false)
@@ -446,7 +456,8 @@
                errors (if (exists-node? cluster-state path false)
                         (dofor [c (get-children cluster-state path false)]
                           (let [data (-> (get-data cluster-state (str path "/" c) false)
-                                         maybe-deserialize)]
+                                       (maybe-deserialize ErrorInfo)
+                                       clojurify-error)]
                             (when data
                               (struct TaskError (:error data) (:time-secs data) (:host data) (:port data))
                               )))

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/config.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/config.clj b/storm-core/src/clj/backtype/storm/config.clj
index 98b1da2..d09b31b 100644
--- a/storm-core/src/clj/backtype/storm/config.clj
+++ b/storm-core/src/clj/backtype/storm/config.clj
@@ -15,7 +15,8 @@
 ;; limitations under the License.
 
 (ns backtype.storm.config
-  (:import [java.io FileReader File IOException])
+  (:import [java.io FileReader File IOException]
+           [backtype.storm.generated StormTopology])
   (:import [backtype.storm Config ConfigValidation$FieldValidator])
   (:import [backtype.storm.utils Utils LocalState])
   (:import [org.apache.commons.io FileUtils])
@@ -211,14 +212,14 @@
   (let [stormroot (supervisor-stormdist-root conf storm-id)
         conf-path (supervisor-stormconf-path stormroot)
         topology-path (supervisor-stormcode-path stormroot)]
-    (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path))))
+    (merge conf (Utils/deserialize (FileUtils/readFileToByteArray (File. conf-path)) java.util.Map))
     ))
 
 (defn read-supervisor-topology
   [conf storm-id]
   (let [stormroot (supervisor-stormdist-root conf storm-id)
         topology-path (supervisor-stormcode-path stormroot)]
-    (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)))
+    (Utils/deserialize (FileUtils/readFileToByteArray (File. topology-path)) StormTopology)
     ))
 
 (defn worker-user-root [conf]

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
new file mode 100644
index 0000000..6a9f4a6
--- /dev/null
+++ b/storm-core/src/clj/backtype/storm/converter.clj
@@ -0,0 +1,200 @@
+(ns backtype.storm.converter
+  (:import [backtype.storm.generated SupervisorInfo NodeInfo Assignment
+            StormBase TopologyStatus ZKWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions TopologyActionOptions])
+  (:use [backtype.storm util stats log])
+  (:require [backtype.storm.daemon [common :as common]]))
+
+(defn thriftify-supervisor-info [supervisor-info]
+  (doto (SupervisorInfo.)
+    (.set_time_secs (long (:time-secs supervisor-info)))
+    (.set_hostname (:hostname supervisor-info))
+    (.set_assignment_id (:assignment-id supervisor-info))
+    (.set_used_ports (map long (:used-ports supervisor-info)))
+    (.set_meta (map long (:meta supervisor-info)))
+    (.set_scheduler_meta (:scheduler-meta supervisor-info))
+    (.set_uptime_secs (long (:uptime-secs supervisor-info)))))
+
+(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
+  (if supervisor-info
+    (backtype.storm.daemon.common.SupervisorInfo.
+      (.get_time_secs supervisor-info)
+      (.get_hostname supervisor-info)
+      (.get_assignment_id supervisor-info)
+      (if (.get_used_ports supervisor-info) (into [] (.get_used_ports supervisor-info)))
+      (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
+      (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
+      (.get_uptime_secs supervisor-info))))
+
+(defn thriftify-assignment [assignment]
+  (doto (Assignment.)
+    (.set_master_code_dir (:master-code-dir assignment))
+    (.set_node_host (:node->host assignment))
+    (.set_executor_node_port (map-val
+                               (fn [node+port]
+                                 (NodeInfo. (first node+port) (set (map long (rest node+port)))))
+                               (map-key #(map long %)
+                                 (:executor->node+port assignment))))
+    (.set_executor_start_time_secs
+      (map-val
+        long
+        (map-key #(map long %)
+          (:executor->start-time-secs assignment))))))
+
+(defn clojurify-executor->node_port [executor->node_port]
+  (into {}
+    (map-val
+      (fn [nodeInfo]
+        (concat [(.get_node nodeInfo)] (.get_port nodeInfo))) ;nodeInfo should be converted to [node,port1,port2..]
+      (map-key
+        (fn [list-of-executors]
+          (into [] list-of-executors)) ; list of executors must be coverted to clojure vector to ensure it is sortable.
+        executor->node_port))))
+
+(defn clojurify-assignment [^Assignment assignment]
+  (if assignment
+    (backtype.storm.daemon.common.Assignment.
+      (.get_master_code_dir assignment)
+      (into {} (.get_node_host assignment))
+      (clojurify-executor->node_port (into {} (.get_executor_node_port assignment)))
+      (map-key (fn [executor] (into [] executor))
+        (into {} (.get_executor_start_time_secs assignment))))))
+
+(defn convert-to-symbol-from-status [status]
+  (condp = status
+    TopologyStatus/ACTIVE {:type :active}
+    TopologyStatus/INACTIVE {:type :inactive}
+    TopologyStatus/REBALANCING {:type :rebalancing}
+    TopologyStatus/KILLED {:type :killed}
+    nil))
+
+(defn- convert-to-status-from-symbol [status]
+  (if status
+    (condp = (:type status)
+      :active TopologyStatus/ACTIVE
+      :inactive TopologyStatus/INACTIVE
+      :rebalancing TopologyStatus/REBALANCING
+      :killed TopologyStatus/KILLED
+      nil)))
+
+(defn clojurify-rebalance-options [^RebalanceOptions rebalance-options]
+  (-> {:action :rebalance}
+    (assoc-non-nil :delay-secs (if (.is_set_wait_secs rebalance-options) (.get_wait_secs rebalance-options)))
+    (assoc-non-nil :num-workers (if (.is_set_num_workers rebalance-options) (.get_num_workers rebalance-options)))
+    (assoc-non-nil :component->executors (if (.is_set_num_executors rebalance-options) (into {} (.get_num_executors rebalance-options))))))
+
+(defn thriftify-rebalance-options [rebalance-options]
+  (if rebalance-options
+    (let [thrift-rebalance-options (RebalanceOptions.)]
+      (if (:delay-secs rebalance-options)
+        (.set_wait_secs thrift-rebalance-options (int (:delay-secs rebalance-options))))
+      (if (:num-workers rebalance-options)
+        (.set_num_workers thrift-rebalance-options (int (:num-workers rebalance-options))))
+      (if (:component->executors rebalance-options)
+        (.set_num_executors thrift-rebalance-options (map-val int (:component->executors rebalance-options))))
+      thrift-rebalance-options)))
+
+(defn clojurify-kill-options [^KillOptions kill-options]
+  (-> {:action :kill}
+    (assoc-non-nil :delay-secs (if (.is_set_wait_secs kill-options) (.get_wait_secs kill-options)))))
+
+(defn thriftify-kill-options [kill-options]
+  (if kill-options
+    (let [thrift-kill-options (KillOptions.)]
+      (if (:delay-secs kill-options)
+        (.set_wait_secs thrift-kill-options (int (:delay-secs kill-options))))
+      thrift-kill-options)))
+
+(defn thriftify-topology-action-options [storm-base]
+  (if (:topology-action-options storm-base)
+    (let [ topology-action-options (:topology-action-options storm-base)
+           action (:action topology-action-options)
+           thrift-topology-action-options (TopologyActionOptions.)]
+      (if (= action :kill)
+        (.set_kill_options thrift-topology-action-options (thriftify-kill-options topology-action-options)))
+      (if (= action :rebalance)
+        (.set_rebalance_options thrift-topology-action-options (thriftify-rebalance-options topology-action-options)))
+      thrift-topology-action-options)))
+
+(defn clojurify-topology-action-options [^TopologyActionOptions topology-action-options]
+  (if (and topology-action-options (.is_set_kill_options topology-action-options))
+      (clojurify-kill-options (.get_kill_options topology-action-options)))
+  (if (and topology-action-options (.is_set_rebalance_options topology-action-options))
+      (clojurify-rebalance-options (.get_rebalance_options topology-action-options))))
+
+(defn thriftify-storm-base [storm-base]
+  (doto (StormBase.)
+    (.set_name (:storm-name storm-base))
+    (.set_launch_time_secs (int (:launch-time-secs storm-base)))
+    (.set_status (convert-to-status-from-symbol (:status storm-base)))
+    (.set_num_workers (int (:num-workers storm-base)))
+    (.set_component_executors (map-val int (:component->executors storm-base)))
+    (.set_owner (:owner storm-base))
+    (.set_topology_action_options (thriftify-topology-action-options storm-base))
+    (.set_prev_status (convert-to-status-from-symbol (:prev-status storm-base)))))
+
+(defn clojurify-storm-base [^StormBase storm-base]
+  (if storm-base
+    (backtype.storm.daemon.common.StormBase.
+      (.get_name storm-base)
+      (.get_launch_time_secs storm-base)
+      (convert-to-symbol-from-status (.get_status storm-base))
+      (.get_num_workers storm-base)
+      (into {} (.get_component_executors storm-base))
+      (.get_owner storm-base)
+      (clojurify-topology-action-options (.get_topology_action_options storm-base))
+      (convert-to-symbol-from-status (.get_prev_status storm-base)))))
+
+(defn thriftify-stats [stats]
+  (if stats
+    (map-val thriftify-executor-stats
+      (map-key #(ExecutorInfo. (int (first %1)) (int (last %1)))
+        stats))
+    {}))
+
+(defn clojurify-stats [stats]
+  (if stats
+    (map-val clojurify-executor-stats
+      (map-key (fn [x] (list (.get_task_start x) (.get_task_end x)))
+        stats))
+    {}))
+
+(defn clojurify-zk-worker-hb [^ZKWorkerHeartbeat worker-hb]
+  (if worker-hb
+    {:storm-id (.get_storm_id worker-hb)
+     :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
+     :uptime (time-delta (.get_time_secs worker-hb))
+     :time-secs (.get_time_secs worker-hb)
+     }
+    {}))
+
+(defn thriftify-zk-worker-hb [worker-hb]
+  (doto (ZKWorkerHeartbeat.)
+    (.set_storm_id (:storm-id worker-hb))
+    (.set_executor_stats (thriftify-stats (:executor-stats worker-hb)))
+    (.set_time_secs (:time-secs worker-hb))))
+
+(defn clojurify-error [^ErrorInfo error]
+  (if error
+    {
+      :error (.get_error error)
+      :time-secs (.get_error_time_secs error)
+      :host (.get_host error)
+      :port (.get_port error)
+      }
+    ))
+
+(defn thriftify-error [error]
+  (doto (ErrorInfo. (:error error) (:time-secs error))
+    (.set_host (:host error))
+    (.set_port (:port error))))
+
+(defn thriftify-credentials [credentials]
+    (doto (Credentials.)
+      (.set_creds (if credentials credentials {}))))
+
+(defn clojurify-crdentials [^Credentials credentials]
+  (if credentials
+    (into {} (.get_creds credentials))
+    nil
+    ))
+

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index f091dfb..c33609d 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -51,7 +51,7 @@
 
 
 ;; component->executors is a map from spout/bolt id to number of executors for that component
-(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner])
+(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status])
 
 (defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs])
 
@@ -65,9 +65,11 @@
 (def LS-LOCAL-ASSIGNMENTS "local-assignments")
 (def LS-APPROVED-WORKERS "approved-workers")
 
-
-
-(defrecord WorkerHeartbeat [time-secs storm-id executors port])
+(defn mk-local-worker-heartbeat [time-secs storm-id executors port]
+  {:time-secs time-secs
+   :storm-id storm-id
+   :executors executors
+   :port port})
 
 (defrecord ExecutorStats [^long processed
                           ^long acked

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/executor.clj b/storm-core/src/clj/backtype/storm/daemon/executor.clj
index a6e606d..a1ecc4a 100644
--- a/storm-core/src/clj/backtype/storm/daemon/executor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/executor.clj
@@ -16,7 +16,9 @@
 (ns backtype.storm.daemon.executor
   (:use [backtype.storm.daemon common])
   (:use [backtype.storm bootstrap])
-  (:import [backtype.storm ICredentialsListener])
+  (:import [backtype.storm ICredentialsListener]
+           [backtype.storm.generated Grouping]
+           [java.io Serializable])
   (:import [backtype.storm.hooks ITaskHook])
   (:import [backtype.storm.tuple Tuple])
   (:import [backtype.storm.spout ISpoutWaitStrategy])
@@ -86,7 +88,7 @@
         (let [grouping (thrift/instantiate-java-object (.get_custom_object thrift-grouping))]
           (mk-custom-grouper grouping context component-id stream-id target-tasks))
       :custom-serialized
-        (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping))]
+        (let [grouping (Utils/deserialize (.get_custom_serialized thrift-grouping) Serializable)]
           (mk-custom-grouper grouping context component-id stream-id target-tasks))
       :direct
         :direct

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index b2cb96a..77f59ad 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -15,7 +15,8 @@
 ;; limitations under the License.
 (ns backtype.storm.daemon.nimbus
   (:import [java.nio ByteBuffer]
-           [java.util Collections])
+           [java.util Collections]
+           [backtype.storm.generated StormTopology])
   (:import [java.io FileNotFoundException])
   (:import [java.nio.channels Channels WritableByteChannel])
   (:import [backtype.storm.security.auth ThriftServer ThriftConnectionType ReqContext AuthUtils])
@@ -99,15 +100,7 @@
            (Utils/deserialize
             (FileUtils/readFileToByteArray
              (File. (master-stormconf-path stormroot))
-             )))))
-
-(defn set-topology-status! [nimbus storm-id status]
-  (let [storm-cluster-state (:storm-cluster-state nimbus)]
-   (.update-storm! storm-cluster-state
-                   storm-id
-                   {:status status})
-   (log-message "Updated " storm-id " with status " status)
-   ))
+             ) java.util.Map))))
 
 (declare delay-event)
 (declare mk-assignments)
@@ -122,8 +115,9 @@
                    storm-id
                    delay
                    :remove)
-      {:type :killed
-       :kill-time-secs delay})
+      {
+        :status {:type :killed}
+        :topology-action-options {:delay-secs delay :action :kill}})
     ))
 
 (defn rebalance-transition [nimbus storm-id status]
@@ -136,24 +130,24 @@
                    storm-id
                    delay
                    :do-rebalance)
-      {:type :rebalancing
-       :delay-secs delay
-       :old-status status
-       :num-workers num-workers
-       :executor-overrides executor-overrides
+      {:status {:type :rebalancing}
+       :prev-status status
+       :topology-action-options (-> {:delay-secs delay :action :rebalance}
+                                  (assoc-non-nil :num-workers num-workers)
+                                  (assoc-non-nil :component->executors executor-overrides))
        })))
 
-(defn do-rebalance [nimbus storm-id status]
-  (.update-storm! (:storm-cluster-state nimbus)
-                  storm-id
-                  (assoc-non-nil
-                    {:component->executors (:executor-overrides status)}
-                    :num-workers
-                    (:num-workers status)))
+(defn do-rebalance [nimbus storm-id status storm-base]
+  (let [rebalance-options (:topology-action-options storm-base)]
+    (.update-storm! (:storm-cluster-state nimbus)
+      storm-id
+        (-> {:topology-action-options nil}
+          (assoc-non-nil :component->executors (:component->executors rebalance-options))
+          (assoc-non-nil :num-workers (:num-workers rebalance-options)))))
   (mk-assignments nimbus :scratch-topology-id storm-id))
 
-(defn state-transitions [nimbus storm-id status]
-  {:active {:inactivate :inactive            
+(defn state-transitions [nimbus storm-id status storm-base]
+  {:active {:inactivate :inactive
             :activate nil
             :rebalance (rebalance-transition nimbus storm-id status)
             :kill (kill-transition nimbus storm-id)
@@ -165,7 +159,7 @@
               }
    :killed {:startup (fn [] (delay-event nimbus
                                          storm-id
-                                         (:kill-time-secs status)
+                                         (:delay-secs storm-base)
                                          :remove)
                              nil)
             :kill (kill-transition nimbus storm-id)
@@ -177,18 +171,15 @@
             }
    :rebalancing {:startup (fn [] (delay-event nimbus
                                               storm-id
-                                              (:delay-secs status)
+                                              (:delay-secs storm-base)
                                               :do-rebalance)
                                  nil)
                  :kill (kill-transition nimbus storm-id)
                  :do-rebalance (fn []
-                                 (do-rebalance nimbus storm-id status)
-                                 (:old-status status))
+                                 (do-rebalance nimbus storm-id status storm-base)
+                                 (:type (:prev-status storm-base)))
                  }})
 
-(defn topology-status [nimbus storm-id]
-  (-> nimbus :storm-cluster-state (.storm-base storm-id nil) :status))
-
 (defn transition!
   ([nimbus storm-id event]
      (transition! nimbus storm-id event false))
@@ -196,7 +187,8 @@
      (locking (:submit-lock nimbus)
        (let [system-events #{:startup}
              [event & event-args] (if (keyword? event) [event] event)
-             status (topology-status nimbus storm-id)]
+             storm-base (-> nimbus :storm-cluster-state  (.storm-base storm-id nil))
+             status (:status storm-base)]
          ;; handles the case where event was scheduled but topology has been removed
          (if-not status
            (log-message "Cannot apply event " event " to " storm-id " because topology no longer exists")
@@ -212,19 +204,20 @@
                                          (log-message msg))
                                        nil))
                                  )))
-                 transition (-> (state-transitions nimbus storm-id status)
+                 transition (-> (state-transitions nimbus storm-id status storm-base)
                                 (get (:type status))
                                 (get-event event))
                  transition (if (or (nil? transition)
                                     (keyword? transition))
                               (fn [] transition)
                               transition)
-                 new-status (apply transition event-args)
-                 new-status (if (keyword? new-status)
-                              {:type new-status}
-                              new-status)]
-             (when new-status
-               (set-topology-status! nimbus storm-id new-status)))))
+                 storm-base-updates (apply transition event-args)
+                 storm-base-updates (if (keyword? storm-base-updates) ;if it's just a symbol, that just indicates new status.
+                                      {:status {:type storm-base-updates}}
+                                      storm-base-updates)]
+
+             (when storm-base-updates
+               (.update-storm! (:storm-cluster-state nimbus) storm-id storm-base-updates)))))
        )))
 
 (defn transition-name! [nimbus storm-name event & args]
@@ -296,7 +289,7 @@
   [nimbus topologies missing-assignment-topologies]
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         ^INimbus inimbus (:inimbus nimbus)
-        
+
         supervisor-infos (all-supervisor-info storm-cluster-state nil)
 
         supervisor-details (dofor [[id info] supervisor-infos]
@@ -327,7 +320,7 @@
     (Utils/deserialize
       (FileUtils/readFileToByteArray
         (File. (master-stormcode-path stormroot))
-        ))))
+        ) StormTopology)))
 
 (declare compute-executor->component)
 
@@ -468,7 +461,7 @@
                                          all-executors
                                          (set (alive-executors nimbus topology-details all-executors assignment)))]]
              {tid alive-executors})))
-  
+
 (defn- compute-supervisor->dead-ports [nimbus existing-assignments topology->executors topology->alive-executors]
   (let [dead-slots (into [] (for [[tid assignment] existing-assignments
                                   :let [all-executors (topology->executors tid)
@@ -514,7 +507,7 @@
                                                                   ((fn [ports] (map int ports))))
                                                     supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
                                           {sid supervisor-details}))]
-    (merge all-supervisor-details 
+    (merge all-supervisor-details
            (into {}
               (for [[sid ports] nonexistent-supervisor-slots]
                 [sid (SupervisorDetails. sid nil ports)]))
@@ -576,7 +569,7 @@
         topology->scheduler-assignment (compute-topology->scheduler-assignment nimbus
                                                                                existing-assignments
                                                                                topology->alive-executors)
-                                                                               
+
         missing-assignment-topologies (->> topologies
                                            .getTopologies
                                            (map (memfn getId))
@@ -594,7 +587,7 @@
         all-scheduling-slots (->> (all-scheduling-slots nimbus topologies missing-assignment-topologies)
                                   (map (fn [[node-id port]] {node-id #{port}}))
                                   (apply merge-with set/union))
-        
+
         supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
         cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
 
@@ -659,7 +652,7 @@
 (defnk mk-assignments [nimbus :scratch-topology-id nil]
   (let [conf (:conf nimbus)
         storm-cluster-state (:storm-cluster-state nimbus)
-        ^INimbus inimbus (:inimbus nimbus) 
+        ^INimbus inimbus (:inimbus nimbus)
         ;; read all the topologies
         topology-ids (.active-storms storm-cluster-state)
         topologies (into {} (for [tid topology-ids]
@@ -679,13 +672,13 @@
                                        existing-assignments
                                        topologies
                                        scratch-topology-id)
-        
+
         topology->executor->node+port (merge (into {} (for [id assigned-topology-ids] {id nil})) topology->executor->node+port)
-        
+
         now-secs (current-time-secs)
-        
+
         basic-supervisor-details-map (basic-supervisor-details-map storm-cluster-state)
-        
+
         ;; construct the final Assignments by adding start-times etc into it
         new-assignments (into {} (for [[topology-id executor->node+port] topology->executor->node+port
                                         :let [existing-assignment (get existing-assignments topology-id)
@@ -723,14 +716,14 @@
     (->> new-assignments
           (map (fn [[topology-id assignment]]
             (let [existing-assignment (get existing-assignments topology-id)]
-              [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))] 
+              [topology-id (map to-worker-slot (newly-added-slots existing-assignment assignment))]
               )))
           (into {})
           (.assignSlots inimbus topologies))
     ))
 
 (defn- start-storm [nimbus storm-name storm-id topology-initial-status]
-  {:pre [(#{:active :inactive} topology-initial-status)]}                
+  {:pre [(#{:active :inactive} topology-initial-status)]}
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         conf (:conf nimbus)
         storm-conf (read-storm-conf conf storm-id)
@@ -744,7 +737,9 @@
                                   {:type topology-initial-status}
                                   (storm-conf TOPOLOGY-WORKERS)
                                   num-executors
-                                  (storm-conf TOPOLOGY-SUBMITTER-USER)))))
+                                  (storm-conf TOPOLOGY-SUBMITTER-USER)
+                                  nil
+                                  nil))))
 
 ;; Master:
 ;; job submit:

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/worker.clj b/storm-core/src/clj/backtype/storm/daemon/worker.clj
index e717ce4..b13e8a7 100644
--- a/storm-core/src/clj/backtype/storm/daemon/worker.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/worker.clj
@@ -61,7 +61,7 @@
 
 (defn do-heartbeat [worker]
   (let [conf (:conf worker)
-        hb (WorkerHeartbeat.
+        hb (mk-local-worker-heartbeat
              (current-time-secs)
              (:storm-id worker)
              (:executors worker)

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/stats.clj b/storm-core/src/clj/backtype/storm/stats.clj
index b872c6f..db8930c 100644
--- a/storm-core/src/clj/backtype/storm/stats.clj
+++ b/storm-core/src/clj/backtype/storm/stats.clj
@@ -19,7 +19,7 @@
             NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
             ClusterSummary TopologyInfo TopologySummary ExecutorSummary ExecutorStats ExecutorSpecificStats
             SpoutStats BoltStats ErrorInfo SupervisorSummary])
-  (:use [backtype.storm util])
+  (:use [backtype.storm util log])
   (:use [clojure.math.numeric-tower :only [ceil]]))
 
 ;;TODO: consider replacing this with some sort of RRD
@@ -301,42 +301,76 @@
   (value-bolt-stats! stats))
 
 (defmulti thriftify-specific-stats :type)
+(defmulti clojurify-specific-stats class-selector)
 
 (defn window-set-converter
-  ([stats key-fn]
-   ;; make the first key a string,
-   (into {}
-         (for [[k v] stats]
-           [(str k)
-            (into {} (for [[k2 v2] v]
-                       [(key-fn k2) v2]))])))
-  ([stats]
-   (window-set-converter stats identity)))
+  ([stats key-fn first-key-fun]
+    (into {}
+      (for [[k v] stats]
+        ;apply the first-key-fun only to first key.
+        [(first-key-fun k)
+         (into {} (for [[k2 v2] v]
+                    [(key-fn k2) v2]))])))
+  ([stats first-key-fun]
+    (window-set-converter stats identity first-key-fun)))
 
 (defn to-global-stream-id
   [[component stream]]
   (GlobalStreamId. component stream))
 
+(defn from-global-stream-id [global-stream-id]
+  [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])
+
+(defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
+  [(window-set-converter (.get_acked stats) from-global-stream-id symbol)
+   (window-set-converter (.get_failed stats) from-global-stream-id symbol)
+   (window-set-converter (.get_process_ms_avg stats) from-global-stream-id symbol)
+   (window-set-converter (.get_executed stats) from-global-stream-id symbol)
+   (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id symbol)])
+
+(defmethod clojurify-specific-stats SpoutStats [^SpoutStats stats]
+  [(window-set-converter (.get_acked stats) symbol)
+   (window-set-converter (.get_failed stats) symbol)
+   (window-set-converter (.get_complete_ms_avg stats) symbol)])
+
+
+(defn clojurify-executor-stats
+  [^ExecutorStats stats]
+  (let [ specific-stats (.get_specific stats)
+         is_bolt? (.is_set_bolt specific-stats)
+         specific-stats (if is_bolt? (.get_bolt specific-stats) (.get_spout specific-stats))
+         specific-stats (clojurify-specific-stats specific-stats)
+         common-stats (CommonStats. (window-set-converter (.get_emitted stats) symbol) (window-set-converter (.get_transferred stats) symbol) (.get_rate stats))]
+    (if is_bolt?
+      ; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats!
+      ; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting values from all atoms and merging all values inside :common to top
+      ;level map we are pretty much doing the same here.
+      (dissoc (merge common-stats {:type :bolt}  (apply ->BoltExecutorStats (into [nil] specific-stats))) :common)
+      (dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common)
+      )))
+
 (defmethod thriftify-specific-stats :bolt
   [stats]
   (ExecutorSpecificStats/bolt
     (BoltStats.
-      (window-set-converter (:acked stats) to-global-stream-id)
-      (window-set-converter (:failed stats) to-global-stream-id)
-      (window-set-converter (:process-latencies stats) to-global-stream-id)
-      (window-set-converter (:executed stats) to-global-stream-id)
-      (window-set-converter (:execute-latencies stats) to-global-stream-id))))
+      (window-set-converter (:acked stats) to-global-stream-id str)
+      (window-set-converter (:failed stats) to-global-stream-id str)
+      (window-set-converter (:process-latencies stats) to-global-stream-id str)
+      (window-set-converter (:executed stats) to-global-stream-id str)
+      (window-set-converter (:execute-latencies stats) to-global-stream-id str))))
 
 (defmethod thriftify-specific-stats :spout
   [stats]
   (ExecutorSpecificStats/spout
-    (SpoutStats. (window-set-converter (:acked stats))
-                 (window-set-converter (:failed stats))
-                 (window-set-converter (:complete-latencies stats)))))
+    (SpoutStats. (window-set-converter (:acked stats) str)
+      (window-set-converter (:failed stats) str)
+      (window-set-converter (:complete-latencies stats) str))))
 
 (defn thriftify-executor-stats
   [stats]
-  (let [specific-stats (thriftify-specific-stats stats)]
-    (ExecutorStats. (window-set-converter (:emitted stats))
-                    (window-set-converter (:transferred stats))
-                    specific-stats)))
+  (let [specific-stats (thriftify-specific-stats stats)
+        rate (:rate stats)]
+    (ExecutorStats. (window-set-converter (:emitted stats) str)
+      (window-set-converter (:transferred stats) str)
+      specific-stats
+      rate)))
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj
index ce0a5ff..5bc1150 100644
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@ -15,12 +15,14 @@
 ;; limitations under the License.
 
 (ns backtype.storm.thrift
-  (:import [java.util HashMap])
+  (:import [java.util HashMap]
+           [java.io Serializable]
+           [backtype.storm.generated NodeInfo Assignment])
   (:import [backtype.storm.generated JavaObject Grouping Nimbus StormTopology
             StormTopology$_Fields Bolt Nimbus$Client Nimbus$Iface
             ComponentCommon Grouping$_Fields SpoutSpec NullStruct StreamInfo
             GlobalStreamId ComponentObject ComponentObject$_Fields
-            ShellComponent])
+            ShellComponent SupervisorInfo])
   (:import [backtype.storm.utils Utils NimbusClient])
   (:import [backtype.storm Constants])
   (:import [backtype.storm.grouping CustomStreamGrouping])
@@ -155,7 +157,7 @@
   [^ComponentObject obj]
   (when (not= (.getSetField obj) ComponentObject$_Fields/SERIALIZED_JAVA)
     (throw (RuntimeException. "Cannot deserialize non-java-serialized object")))
-  (Utils/deserialize (.get_serialized_java obj)))
+  (Utils/deserialize (.get_serialized_java obj) Serializable))
 
 (defn serialize-component-object
   [obj]
@@ -271,3 +273,4 @@
 (def SPOUT-FIELDS
   [StormTopology$_Fields/SPOUTS
    StormTopology$_Fields/STATE_SPOUTS])
+

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
index b9b97e9..3d3208b 100644
--- a/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
+++ b/storm-core/src/jvm/backtype/storm/coordination/BatchBoltExecutor.java
@@ -103,6 +103,6 @@ public class BatchBoltExecutor implements IRichBolt, FinishedCallback, TimeoutCa
     }
     
     private IBatchBolt newTransactionalBolt() {
-        return (IBatchBolt) Utils.deserialize(_boltSer);
+        return Utils.deserialize(_boltSer, IBatchBolt.class);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/63900643/storm-core/src/jvm/backtype/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Assignment.java b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
new file mode 100644
index 0000000..b6ade94
--- /dev/null
+++ b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
@@ -0,0 +1,817 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Autogenerated by Thrift Compiler (0.7.0)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ */
+package backtype.storm.generated;
+
+import org.apache.commons.lang.builder.HashCodeBuilder;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.EnumMap;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.EnumSet;
+import java.util.Collections;
+import java.util.BitSet;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Assignment implements org.apache.thrift.TBase<Assignment, Assignment._Fields>, java.io.Serializable, Cloneable {
+  private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Assignment");
+
+  private static final org.apache.thrift.protocol.TField MASTER_CODE_DIR_FIELD_DESC = new org.apache.thrift.protocol.TField("master_code_dir", org.apache.thrift.protocol.TType.STRING, (short)1);
+  private static final org.apache.thrift.protocol.TField NODE_HOST_FIELD_DESC = new org.apache.thrift.protocol.TField("node_host", org.apache.thrift.protocol.TType.MAP, (short)2);
+  private static final org.apache.thrift.protocol.TField EXECUTOR_NODE_PORT_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_node_port", org.apache.thrift.protocol.TType.MAP, (short)3);
+  private static final org.apache.thrift.protocol.TField EXECUTOR_START_TIME_SECS_FIELD_DESC = new org.apache.thrift.protocol.TField("executor_start_time_secs", org.apache.thrift.protocol.TType.MAP, (short)4);
+
+  private String master_code_dir; // required
+  private Map<String,String> node_host; // required
+  private Map<List<Long>,NodeInfo> executor_node_port; // required
+  private Map<List<Long>,Long> executor_start_time_secs; // required
+
+  /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
+  public enum _Fields implements org.apache.thrift.TFieldIdEnum {
+    MASTER_CODE_DIR((short)1, "master_code_dir"),
+    NODE_HOST((short)2, "node_host"),
+    EXECUTOR_NODE_PORT((short)3, "executor_node_port"),
+    EXECUTOR_START_TIME_SECS((short)4, "executor_start_time_secs");
+
+    private static final Map<String, _Fields> byName = new HashMap<String, _Fields>();
+
+    static {
+      for (_Fields field : EnumSet.allOf(_Fields.class)) {
+        byName.put(field.getFieldName(), field);
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, or null if its not found.
+     */
+    public static _Fields findByThriftId(int fieldId) {
+      switch(fieldId) {
+        case 1: // MASTER_CODE_DIR
+          return MASTER_CODE_DIR;
+        case 2: // NODE_HOST
+          return NODE_HOST;
+        case 3: // EXECUTOR_NODE_PORT
+          return EXECUTOR_NODE_PORT;
+        case 4: // EXECUTOR_START_TIME_SECS
+          return EXECUTOR_START_TIME_SECS;
+        default:
+          return null;
+      }
+    }
+
+    /**
+     * Find the _Fields constant that matches fieldId, throwing an exception
+     * if it is not found.
+     */
+    public static _Fields findByThriftIdOrThrow(int fieldId) {
+      _Fields fields = findByThriftId(fieldId);
+      if (fields == null) throw new IllegalArgumentException("Field " + fieldId + " doesn't exist!");
+      return fields;
+    }
+
+    /**
+     * Find the _Fields constant that matches name, or null if its not found.
+     */
+    public static _Fields findByName(String name) {
+      return byName.get(name);
+    }
+
+    private final short _thriftId;
+    private final String _fieldName;
+
+    _Fields(short thriftId, String fieldName) {
+      _thriftId = thriftId;
+      _fieldName = fieldName;
+    }
+
+    public short getThriftFieldId() {
+      return _thriftId;
+    }
+
+    public String getFieldName() {
+      return _fieldName;
+    }
+  }
+
+  // isset id assignments
+
+  public static final Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
+  static {
+    Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
+    tmpMap.put(_Fields.MASTER_CODE_DIR, new org.apache.thrift.meta_data.FieldMetaData("master_code_dir", org.apache.thrift.TFieldRequirementType.REQUIRED,
+        new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.NODE_HOST, new org.apache.thrift.meta_data.FieldMetaData("node_host", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+        new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING),
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
+    tmpMap.put(_Fields.EXECUTOR_NODE_PORT, new org.apache.thrift.meta_data.FieldMetaData("executor_node_port", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+        new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+            new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+                new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)),
+            new org.apache.thrift.meta_data.StructMetaData(org.apache.thrift.protocol.TType.STRUCT, NodeInfo.class))));
+    tmpMap.put(_Fields.EXECUTOR_START_TIME_SECS, new org.apache.thrift.meta_data.FieldMetaData("executor_start_time_secs", org.apache.thrift.TFieldRequirementType.OPTIONAL,
+        new org.apache.thrift.meta_data.MapMetaData(org.apache.thrift.protocol.TType.MAP,
+            new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,
+                new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64)),
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I64))));
+    metaDataMap = Collections.unmodifiableMap(tmpMap);
+    org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(Assignment.class, metaDataMap);
+  }
+
+  public Assignment() {
+    this.node_host = new HashMap<String,String>();
+
+    this.executor_node_port = new HashMap<List<Long>,NodeInfo>();
+
+    this.executor_start_time_secs = new HashMap<List<Long>,Long>();
+
+  }
+
+  public Assignment(
+    String master_code_dir)
+  {
+    this();
+    this.master_code_dir = master_code_dir;
+  }
+
+  /**
+   * Performs a deep copy on <i>other</i>.
+   */
+  public Assignment(Assignment other) {
+    if (other.is_set_master_code_dir()) {
+      this.master_code_dir = other.master_code_dir;
+    }
+    if (other.is_set_node_host()) {
+      Map<String,String> __this__node_host = new HashMap<String,String>();
+      for (Map.Entry<String, String> other_element : other.node_host.entrySet()) {
+
+        String other_element_key = other_element.getKey();
+        String other_element_value = other_element.getValue();
+
+        String __this__node_host_copy_key = other_element_key;
+
+        String __this__node_host_copy_value = other_element_value;
+
+        __this__node_host.put(__this__node_host_copy_key, __this__node_host_copy_value);
+      }
+      this.node_host = __this__node_host;
+    }
+    if (other.is_set_executor_node_port()) {
+      Map<List<Long>,NodeInfo> __this__executor_node_port = new HashMap<List<Long>,NodeInfo>();
+      for (Map.Entry<List<Long>, NodeInfo> other_element : other.executor_node_port.entrySet()) {
+
+        List<Long> other_element_key = other_element.getKey();
+        NodeInfo other_element_value = other_element.getValue();
+
+        List<Long> __this__executor_node_port_copy_key = new ArrayList<Long>();
+        for (Long other_element_key_element : other_element_key) {
+          __this__executor_node_port_copy_key.add(other_element_key_element);
+        }
+
+        NodeInfo __this__executor_node_port_copy_value = new NodeInfo(other_element_value);
+
+        __this__executor_node_port.put(__this__executor_node_port_copy_key, __this__executor_node_port_copy_value);
+      }
+      this.executor_node_port = __this__executor_node_port;
+    }
+    if (other.is_set_executor_start_time_secs()) {
+      Map<List<Long>,Long> __this__executor_start_time_secs = new HashMap<List<Long>,Long>();
+      for (Map.Entry<List<Long>, Long> other_element : other.executor_start_time_secs.entrySet()) {
+
+        List<Long> other_element_key = other_element.getKey();
+        Long other_element_value = other_element.getValue();
+
+        List<Long> __this__executor_start_time_secs_copy_key = new ArrayList<Long>();
+        for (Long other_element_key_element : other_element_key) {
+          __this__executor_start_time_secs_copy_key.add(other_element_key_element);
+        }
+
+        Long __this__executor_start_time_secs_copy_value = other_element_value;
+
+        __this__executor_start_time_secs.put(__this__executor_start_time_secs_copy_key, __this__executor_start_time_secs_copy_value);
+      }
+      this.executor_start_time_secs = __this__executor_start_time_secs;
+    }
+  }
+
+  public Assignment deepCopy() {
+    return new Assignment(this);
+  }
+
+  @Override
+  public void clear() {
+    this.master_code_dir = null;
+    this.node_host = new HashMap<String,String>();
+
+    this.executor_node_port = new HashMap<List<Long>,NodeInfo>();
+
+    this.executor_start_time_secs = new HashMap<List<Long>,Long>();
+
+  }
+
+  public String get_master_code_dir() {
+    return this.master_code_dir;
+  }
+
+  public void set_master_code_dir(String master_code_dir) {
+    this.master_code_dir = master_code_dir;
+  }
+
+  public void unset_master_code_dir() {
+    this.master_code_dir = null;
+  }
+
+  /** Returns true if field master_code_dir is set (has been assigned a value) and false otherwise */
+  public boolean is_set_master_code_dir() {
+    return this.master_code_dir != null;
+  }
+
+  public void set_master_code_dir_isSet(boolean value) {
+    if (!value) {
+      this.master_code_dir = null;
+    }
+  }
+
+  public int get_node_host_size() {
+    return (this.node_host == null) ? 0 : this.node_host.size();
+  }
+
+  public void put_to_node_host(String key, String val) {
+    if (this.node_host == null) {
+      this.node_host = new HashMap<String,String>();
+    }
+    this.node_host.put(key, val);
+  }
+
+  public Map<String,String> get_node_host() {
+    return this.node_host;
+  }
+
+  public void set_node_host(Map<String,String> node_host) {
+    this.node_host = node_host;
+  }
+
+  public void unset_node_host() {
+    this.node_host = null;
+  }
+
+  /** Returns true if field node_host is set (has been assigned a value) and false otherwise */
+  public boolean is_set_node_host() {
+    return this.node_host != null;
+  }
+
+  public void set_node_host_isSet(boolean value) {
+    if (!value) {
+      this.node_host = null;
+    }
+  }
+
+  public int get_executor_node_port_size() {
+    return (this.executor_node_port == null) ? 0 : this.executor_node_port.size();
+  }
+
+  public void put_to_executor_node_port(List<Long> key, NodeInfo val) {
+    if (this.executor_node_port == null) {
+      this.executor_node_port = new HashMap<List<Long>,NodeInfo>();
+    }
+    this.executor_node_port.put(key, val);
+  }
+
+  public Map<List<Long>,NodeInfo> get_executor_node_port() {
+    return this.executor_node_port;
+  }
+
+  public void set_executor_node_port(Map<List<Long>,NodeInfo> executor_node_port) {
+    this.executor_node_port = executor_node_port;
+  }
+
+  public void unset_executor_node_port() {
+    this.executor_node_port = null;
+  }
+
+  /** Returns true if field executor_node_port is set (has been assigned a value) and false otherwise */
+  public boolean is_set_executor_node_port() {
+    return this.executor_node_port != null;
+  }
+
+  public void set_executor_node_port_isSet(boolean value) {
+    if (!value) {
+      this.executor_node_port = null;
+    }
+  }
+
+  public int get_executor_start_time_secs_size() {
+    return (this.executor_start_time_secs == null) ? 0 : this.executor_start_time_secs.size();
+  }
+
+  public void put_to_executor_start_time_secs(List<Long> key, long val) {
+    if (this.executor_start_time_secs == null) {
+      this.executor_start_time_secs = new HashMap<List<Long>,Long>();
+    }
+    this.executor_start_time_secs.put(key, val);
+  }
+
+  public Map<List<Long>,Long> get_executor_start_time_secs() {
+    return this.executor_start_time_secs;
+  }
+
+  public void set_executor_start_time_secs(Map<List<Long>,Long> executor_start_time_secs) {
+    this.executor_start_time_secs = executor_start_time_secs;
+  }
+
+  public void unset_executor_start_time_secs() {
+    this.executor_start_time_secs = null;
+  }
+
+  /** Returns true if field executor_start_time_secs is set (has been assigned a value) and false otherwise */
+  public boolean is_set_executor_start_time_secs() {
+    return this.executor_start_time_secs != null;
+  }
+
+  public void set_executor_start_time_secs_isSet(boolean value) {
+    if (!value) {
+      this.executor_start_time_secs = null;
+    }
+  }
+
+  public void setFieldValue(_Fields field, Object value) {
+    switch (field) {
+    case MASTER_CODE_DIR:
+      if (value == null) {
+        unset_master_code_dir();
+      } else {
+        set_master_code_dir((String)value);
+      }
+      break;
+
+    case NODE_HOST:
+      if (value == null) {
+        unset_node_host();
+      } else {
+        set_node_host((Map<String,String>)value);
+      }
+      break;
+
+    case EXECUTOR_NODE_PORT:
+      if (value == null) {
+        unset_executor_node_port();
+      } else {
+        set_executor_node_port((Map<List<Long>,NodeInfo>)value);
+      }
+      break;
+
+    case EXECUTOR_START_TIME_SECS:
+      if (value == null) {
+        unset_executor_start_time_secs();
+      } else {
+        set_executor_start_time_secs((Map<List<Long>,Long>)value);
+      }
+      break;
+
+    }
+  }
+
+  public Object getFieldValue(_Fields field) {
+    switch (field) {
+    case MASTER_CODE_DIR:
+      return get_master_code_dir();
+
+    case NODE_HOST:
+      return get_node_host();
+
+    case EXECUTOR_NODE_PORT:
+      return get_executor_node_port();
+
+    case EXECUTOR_START_TIME_SECS:
+      return get_executor_start_time_secs();
+
+    }
+    throw new IllegalStateException();
+  }
+
+  /** Returns true if field corresponding to fieldID is set (has been assigned a value) and false otherwise */
+  public boolean isSet(_Fields field) {
+    if (field == null) {
+      throw new IllegalArgumentException();
+    }
+
+    switch (field) {
+    case MASTER_CODE_DIR:
+      return is_set_master_code_dir();
+    case NODE_HOST:
+      return is_set_node_host();
+    case EXECUTOR_NODE_PORT:
+      return is_set_executor_node_port();
+    case EXECUTOR_START_TIME_SECS:
+      return is_set_executor_start_time_secs();
+    }
+    throw new IllegalStateException();
+  }
+
+  @Override
+  public boolean equals(Object that) {
+    if (that == null)
+      return false;
+    if (that instanceof Assignment)
+      return this.equals((Assignment)that);
+    return false;
+  }
+
+  public boolean equals(Assignment that) {
+    if (that == null)
+      return false;
+
+    boolean this_present_master_code_dir = true && this.is_set_master_code_dir();
+    boolean that_present_master_code_dir = true && that.is_set_master_code_dir();
+    if (this_present_master_code_dir || that_present_master_code_dir) {
+      if (!(this_present_master_code_dir && that_present_master_code_dir))
+        return false;
+      if (!this.master_code_dir.equals(that.master_code_dir))
+        return false;
+    }
+
+    boolean this_present_node_host = true && this.is_set_node_host();
+    boolean that_present_node_host = true && that.is_set_node_host();
+    if (this_present_node_host || that_present_node_host) {
+      if (!(this_present_node_host && that_present_node_host))
+        return false;
+      if (!this.node_host.equals(that.node_host))
+        return false;
+    }
+
+    boolean this_present_executor_node_port = true && this.is_set_executor_node_port();
+    boolean that_present_executor_node_port = true && that.is_set_executor_node_port();
+    if (this_present_executor_node_port || that_present_executor_node_port) {
+      if (!(this_present_executor_node_port && that_present_executor_node_port))
+        return false;
+      if (!this.executor_node_port.equals(that.executor_node_port))
+        return false;
+    }
+
+    boolean this_present_executor_start_time_secs = true && this.is_set_executor_start_time_secs();
+    boolean that_present_executor_start_time_secs = true && that.is_set_executor_start_time_secs();
+    if (this_present_executor_start_time_secs || that_present_executor_start_time_secs) {
+      if (!(this_present_executor_start_time_secs && that_present_executor_start_time_secs))
+        return false;
+      if (!this.executor_start_time_secs.equals(that.executor_start_time_secs))
+        return false;
+    }
+
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    HashCodeBuilder builder = new HashCodeBuilder();
+
+    boolean present_master_code_dir = true && (is_set_master_code_dir());
+    builder.append(present_master_code_dir);
+    if (present_master_code_dir)
+      builder.append(master_code_dir);
+
+    boolean present_node_host = true && (is_set_node_host());
+    builder.append(present_node_host);
+    if (present_node_host)
+      builder.append(node_host);
+
+    boolean present_executor_node_port = true && (is_set_executor_node_port());
+    builder.append(present_executor_node_port);
+    if (present_executor_node_port)
+      builder.append(executor_node_port);
+
+    boolean present_executor_start_time_secs = true && (is_set_executor_start_time_secs());
+    builder.append(present_executor_start_time_secs);
+    if (present_executor_start_time_secs)
+      builder.append(executor_start_time_secs);
+
+    return builder.toHashCode();
+  }
+
+  public int compareTo(Assignment other) {
+    if (!getClass().equals(other.getClass())) {
+      return getClass().getName().compareTo(other.getClass().getName());
+    }
+
+    int lastComparison = 0;
+    Assignment typedOther = (Assignment)other;
+
+    lastComparison = Boolean.valueOf(is_set_master_code_dir()).compareTo(typedOther.is_set_master_code_dir());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_master_code_dir()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.master_code_dir, typedOther.master_code_dir);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_node_host()).compareTo(typedOther.is_set_node_host());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_node_host()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.node_host, typedOther.node_host);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_executor_node_port()).compareTo(typedOther.is_set_executor_node_port());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_executor_node_port()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executor_node_port, typedOther.executor_node_port);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    lastComparison = Boolean.valueOf(is_set_executor_start_time_secs()).compareTo(typedOther.is_set_executor_start_time_secs());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (is_set_executor_start_time_secs()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.executor_start_time_secs, typedOther.executor_start_time_secs);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
+    return 0;
+  }
+
+  public _Fields fieldForId(int fieldId) {
+    return _Fields.findByThriftId(fieldId);
+  }
+
+  public void read(org.apache.thrift.protocol.TProtocol iprot) throws org.apache.thrift.TException {
+    org.apache.thrift.protocol.TField field;
+    iprot.readStructBegin();
+    while (true)
+    {
+      field = iprot.readFieldBegin();
+      if (field.type == org.apache.thrift.protocol.TType.STOP) {
+        break;
+      }
+      switch (field.id) {
+        case 1: // MASTER_CODE_DIR
+          if (field.type == org.apache.thrift.protocol.TType.STRING) {
+            this.master_code_dir = iprot.readString();
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 2: // NODE_HOST
+          if (field.type == org.apache.thrift.protocol.TType.MAP) {
+            {
+              org.apache.thrift.protocol.TMap _map185 = iprot.readMapBegin();
+              this.node_host = new HashMap<String,String>(2*_map185.size);
+              for (int _i186 = 0; _i186 < _map185.size; ++_i186)
+              {
+                String _key187; // required
+                String _val188; // required
+                _key187 = iprot.readString();
+                _val188 = iprot.readString();
+                this.node_host.put(_key187, _val188);
+              }
+              iprot.readMapEnd();
+            }
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 3: // EXECUTOR_NODE_PORT
+          if (field.type == org.apache.thrift.protocol.TType.MAP) {
+            {
+              org.apache.thrift.protocol.TMap _map189 = iprot.readMapBegin();
+              this.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map189.size);
+              for (int _i190 = 0; _i190 < _map189.size; ++_i190)
+              {
+                List<Long> _key191; // required
+                NodeInfo _val192; // required
+                {
+                  org.apache.thrift.protocol.TList _list193 = iprot.readListBegin();
+                  _key191 = new ArrayList<Long>(_list193.size);
+                  for (int _i194 = 0; _i194 < _list193.size; ++_i194)
+                  {
+                    long _elem195; // required
+                    _elem195 = iprot.readI64();
+                    _key191.add(_elem195);
+                  }
+                  iprot.readListEnd();
+                }
+                _val192 = new NodeInfo();
+                _val192.read(iprot);
+                this.executor_node_port.put(_key191, _val192);
+              }
+              iprot.readMapEnd();
+            }
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        case 4: // EXECUTOR_START_TIME_SECS
+          if (field.type == org.apache.thrift.protocol.TType.MAP) {
+            {
+              org.apache.thrift.protocol.TMap _map196 = iprot.readMapBegin();
+              this.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map196.size);
+              for (int _i197 = 0; _i197 < _map196.size; ++_i197)
+              {
+                List<Long> _key198; // required
+                long _val199; // required
+                {
+                  org.apache.thrift.protocol.TList _list200 = iprot.readListBegin();
+                  _key198 = new ArrayList<Long>(_list200.size);
+                  for (int _i201 = 0; _i201 < _list200.size; ++_i201)
+                  {
+                    long _elem202; // required
+                    _elem202 = iprot.readI64();
+                    _key198.add(_elem202);
+                  }
+                  iprot.readListEnd();
+                }
+                _val199 = iprot.readI64();
+                this.executor_start_time_secs.put(_key198, _val199);
+              }
+              iprot.readMapEnd();
+            }
+          } else { 
+            org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+          }
+          break;
+        default:
+          org.apache.thrift.protocol.TProtocolUtil.skip(iprot, field.type);
+      }
+      iprot.readFieldEnd();
+    }
+    iprot.readStructEnd();
+    validate();
+  }
+
+  public void write(org.apache.thrift.protocol.TProtocol oprot) throws org.apache.thrift.TException {
+    validate();
+
+    oprot.writeStructBegin(STRUCT_DESC);
+    if (this.master_code_dir != null) {
+      oprot.writeFieldBegin(MASTER_CODE_DIR_FIELD_DESC);
+      oprot.writeString(this.master_code_dir);
+      oprot.writeFieldEnd();
+    }
+    if (this.node_host != null) {
+      if (is_set_node_host()) {
+        oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
+        {
+          oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, this.node_host.size()));
+          for (Map.Entry<String, String> _iter203 : this.node_host.entrySet())
+          {
+            oprot.writeString(_iter203.getKey());
+            oprot.writeString(_iter203.getValue());
+          }
+          oprot.writeMapEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+    }
+    if (this.executor_node_port != null) {
+      if (is_set_executor_node_port()) {
+        oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
+        {
+          oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, this.executor_node_port.size()));
+          for (Map.Entry<List<Long>, NodeInfo> _iter204 : this.executor_node_port.entrySet())
+          {
+            {
+              oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter204.getKey().size()));
+              for (long _iter205 : _iter204.getKey())
+              {
+                oprot.writeI64(_iter205);
+              }
+              oprot.writeListEnd();
+            }
+            _iter204.getValue().write(oprot);
+          }
+          oprot.writeMapEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+    }
+    if (this.executor_start_time_secs != null) {
+      if (is_set_executor_start_time_secs()) {
+        oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
+        {
+          oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, this.executor_start_time_secs.size()));
+          for (Map.Entry<List<Long>, Long> _iter206 : this.executor_start_time_secs.entrySet())
+          {
+            {
+              oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter206.getKey().size()));
+              for (long _iter207 : _iter206.getKey())
+              {
+                oprot.writeI64(_iter207);
+              }
+              oprot.writeListEnd();
+            }
+            oprot.writeI64(_iter206.getValue());
+          }
+          oprot.writeMapEnd();
+        }
+        oprot.writeFieldEnd();
+      }
+    }
+    oprot.writeFieldStop();
+    oprot.writeStructEnd();
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder("Assignment(");
+    boolean first = true;
+
+    sb.append("master_code_dir:");
+    if (this.master_code_dir == null) {
+      sb.append("null");
+    } else {
+      sb.append(this.master_code_dir);
+    }
+    first = false;
+    if (is_set_node_host()) {
+      if (!first) sb.append(", ");
+      sb.append("node_host:");
+      if (this.node_host == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.node_host);
+      }
+      first = false;
+    }
+    if (is_set_executor_node_port()) {
+      if (!first) sb.append(", ");
+      sb.append("executor_node_port:");
+      if (this.executor_node_port == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.executor_node_port);
+      }
+      first = false;
+    }
+    if (is_set_executor_start_time_secs()) {
+      if (!first) sb.append(", ");
+      sb.append("executor_start_time_secs:");
+      if (this.executor_start_time_secs == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.executor_start_time_secs);
+      }
+      first = false;
+    }
+    sb.append(")");
+    return sb.toString();
+  }
+
+  public void validate() throws org.apache.thrift.TException {
+    // check for required fields
+    if (!is_set_master_code_dir()) {
+      throw new org.apache.thrift.protocol.TProtocolException("Required field 'master_code_dir' is unset! Struct:" + toString());
+    }
+
+  }
+
+  private void writeObject(java.io.ObjectOutputStream out) throws java.io.IOException {
+    try {
+      write(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(out)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+  private void readObject(java.io.ObjectInputStream in) throws java.io.IOException, ClassNotFoundException {
+    try {
+      read(new org.apache.thrift.protocol.TCompactProtocol(new org.apache.thrift.transport.TIOStreamTransport(in)));
+    } catch (org.apache.thrift.TException te) {
+      throw new java.io.IOException(te);
+    }
+  }
+
+}
+