You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/09 16:53:35 UTC

[7/9] storm git commit: [STORM-893] - Resource Aware Scheduler implementation. [STORM-894] - Basic Resource Aware Scheduling implementation.

[STORM-893] - Resource Aware Scheduler implementation.
[STORM-894] - Basic Resource Aware Scheduling implementation.

Added functionality for users to limit the amount of memory resources allocated to a worker (JVM) process when scheduling with resource aware scheduler. This allows users to potentially spread executors more evenly across workers.
Also refactored some code


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/121d022b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/121d022b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/121d022b

Branch: refs/heads/master
Commit: 121d022b9f11146f6dadc2cd402c747472cac0d1
Parents: 1822491
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Tue Oct 6 22:23:18 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Oct 9 08:44:20 2015 -0500

----------------------------------------------------------------------
 conf/defaults.yaml                              |  14 +
 .../starter/ResourceAwareExampleTopology.java   |  87 ++
 storm-core/src/clj/backtype/storm/converter.clj |   4 +-
 .../src/clj/backtype/storm/daemon/common.clj    |   2 +-
 .../src/clj/backtype/storm/daemon/nimbus.clj    |  29 +-
 .../clj/backtype/storm/daemon/supervisor.clj    |   9 +-
 storm-core/src/jvm/backtype/storm/Config.java   |  63 ++
 .../jvm/backtype/storm/ConfigValidation.java    |  22 +
 .../src/jvm/backtype/storm/StormSubmitter.java  |  35 +-
 .../backtype/storm/generated/Assignment.java    | 194 ++--
 .../storm/generated/BoltAggregateStats.java     |   2 +-
 .../jvm/backtype/storm/generated/BoltStats.java | 442 ++++-----
 .../storm/generated/ClusterSummary.java         | 110 +-
 .../storm/generated/ClusterWorkerHeartbeat.java |  54 +-
 .../storm/generated/CommonAggregateStats.java   |   2 +-
 .../generated/ComponentAggregateStats.java      |   2 +-
 .../storm/generated/ComponentPageInfo.java      | 222 ++---
 .../backtype/storm/generated/Credentials.java   |  46 +-
 .../storm/generated/ExecutorAggregateStats.java |   2 +-
 .../backtype/storm/generated/ExecutorStats.java | 170 ++--
 .../storm/generated/LSApprovedWorkers.java      |  46 +-
 .../generated/LSSupervisorAssignments.java      |  50 +-
 .../storm/generated/LSWorkerHeartbeat.java      |  38 +-
 .../storm/generated/LocalAssignment.java        |  38 +-
 .../storm/generated/LocalStateData.java         |  50 +-
 .../jvm/backtype/storm/generated/LogConfig.java |  50 +-
 .../jvm/backtype/storm/generated/LogLevel.java  |   2 +-
 .../jvm/backtype/storm/generated/Nimbus.java    |   2 +-
 .../jvm/backtype/storm/generated/NodeInfo.java  |  34 +-
 .../storm/generated/RebalanceOptions.java       |  46 +-
 .../storm/generated/SpoutAggregateStats.java    |   2 +-
 .../backtype/storm/generated/SpoutStats.java    | 254 ++---
 .../jvm/backtype/storm/generated/StormBase.java |  94 +-
 .../storm/generated/SupervisorInfo.java         | 276 +++--
 .../storm/generated/SupervisorSummary.java      | 168 +++-
 .../backtype/storm/generated/TopologyInfo.java  | 162 +--
 .../storm/generated/TopologyPageInfo.java       |  98 +-
 .../backtype/storm/generated/TopologyStats.java | 222 ++---
 .../AbstractDNSToSwitchMapping.java             |  95 ++
 .../networktopography/DNSToSwitchMapping.java   |  50 +
 .../DefaultRackDNSToSwitchMapping.java          |  35 +
 .../jvm/backtype/storm/scheduler/Cluster.java   |  46 +-
 .../storm/scheduler/SupervisorDetails.java      |  61 +-
 .../backtype/storm/scheduler/Topologies.java    |  15 +-
 .../storm/scheduler/TopologyDetails.java        | 330 +++++-
 .../storm/scheduler/resource/Component.java     |  54 +
 .../storm/scheduler/resource/RAS_Node.java      | 547 ++++++++++
 .../resource/ResourceAwareScheduler.java        | 149 +++
 .../storm/scheduler/resource/ResourceUtils.java | 133 +++
 .../resource/strategies/IStrategy.java          |  37 +
 .../strategies/ResourceAwareStrategy.java       | 480 +++++++++
 .../AlternateRackDNSToSwitchMapping.java        |  48 +
 .../topology/BaseConfigurationDeclarer.java     |  31 +-
 .../ComponentConfigurationDeclarer.java         |   3 +
 .../src/jvm/backtype/storm/utils/Utils.java     |  19 +
 storm-core/src/py/storm/ttypes.py               | 994 ++++++++++---------
 storm-core/src/storm.thrift                     |   2 +
 .../test/clj/backtype/storm/cluster_test.clj    |   4 +-
 .../scheduler/multitenant_scheduler_test.clj    |  34 +-
 .../scheduler/resource_aware_scheduler_test.clj | 669 +++++++++++++
 .../test/clj/backtype/storm/scheduler_test.clj  |   3 +-
 61 files changed, 5161 insertions(+), 1821 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4a8e354..a41d805 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -132,6 +132,11 @@ supervisor.heartbeat.frequency.secs: 5
 supervisor.enable: true
 supervisor.supervisors: []
 supervisor.supervisors.commands: []
+supervisor.memory.capacity.mb: 3072.0
+#By convention 1 cpu core should be about 100, but this can be adjusted if needed
+# using 100 makes it simple to set the desired value to the capacity measurement
+# for single threaded bolts
+supervisor.cpu.capacity: 400.0
 
 ### worker.* configs are for task workers
 worker.childopts: "-Xmx768m"
@@ -174,6 +179,9 @@ storm.messaging.netty.socket.backlog: 500
 # By default, the Netty SASL authentication is set to false.  Users can override and set it true for a specific topology.
 storm.messaging.netty.authentication: false
 
+# Default plugin to use for automatic network topology discovery
+storm.network.topography.plugin: backtype.storm.networktopography.DefaultRackDNSToSwitchMapping
+
 # default number of seconds group mapping service will cache user group
 storm.group.mapping.service.cache.duration.secs: 120
 
@@ -215,4 +223,10 @@ topology.environment: null
 topology.bolts.outgoing.overflow.buffer.enable: false
 topology.disruptor.wait.timeout.millis: 1000
 
+# Configs for Resource Aware Scheduler
+topology.component.resources.onheap.memory.mb: 128.0
+topology.component.resources.offheap.memory.mb: 0.0
+topology.component.cpu.pcore.percent: 10.0
+topology.worker.max.heap.size.mb: 768.0
+
 dev.zookeeper.path: "/tmp/dev-storm-zookeeper"

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
new file mode 100644
index 0000000..96e300f
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
@@ -0,0 +1,87 @@
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * Created by jerrypeng on 8/19/15.
+ */
+public class ResourceAwareExampleTopology {
+  public static class ExclamationBolt extends BaseRichBolt {
+    OutputCollector _collector;
+
+    @Override
+    public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+      _collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple tuple) {
+      _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+      _collector.ack(tuple);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+      declarer.declare(new Fields("word"));
+    }
+
+
+  }
+
+  public static void main(String[] args) throws Exception {
+    TopologyBuilder builder = new TopologyBuilder();
+
+    SpoutDeclarer spout =  builder.setSpout("word", new TestWordSpout(), 10);
+    //set cpu requirement
+    spout.setCPULoad(20);
+    //set onheap and offheap memory requirement
+    spout.setMemoryLoad(64, 16);
+
+    BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
+    //sets cpu requirement.  Not neccessary to set both CPU and memory.
+    //For requirements not set, a default value will be used
+    bolt1.setCPULoad(15);
+
+    BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
+    bolt2.setMemoryLoad(100);
+
+    Config conf = new Config();
+    conf.setDebug(true);
+
+    /**
+     * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
+     * Can be used to spread executors to to multiple workers
+     */
+    conf.setTopologyWorkerMaxHeapSize(512.0);
+
+    if (args != null && args.length > 0) {
+      conf.setNumWorkers(3);
+
+      StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+    }
+    else {
+
+      LocalCluster cluster = new LocalCluster();
+      cluster.submitTopology("test", conf, builder.createTopology());
+      Utils.sleep(10000);
+      cluster.killTopology("test");
+      cluster.shutdown();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
index c571fa1..27336f0 100644
--- a/storm-core/src/clj/backtype/storm/converter.clj
+++ b/storm-core/src/clj/backtype/storm/converter.clj
@@ -30,6 +30,7 @@
     (.set_scheduler_meta (:scheduler-meta supervisor-info))
     (.set_uptime_secs (long (:uptime-secs supervisor-info)))
     (.set_version (:version supervisor-info))
+    (.set_resources_map (:resources-map supervisor-info))
     ))
 
 (defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
@@ -42,7 +43,8 @@
       (if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
       (if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
       (.get_uptime_secs supervisor-info)
-      (.get_version supervisor-info))))
+      (.get_version supervisor-info)
+      (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map)))))
 
 (defn thriftify-assignment [assignment]
   (doto (Assignment.)

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index a30850d..a1b6241 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -57,7 +57,7 @@
 ;; component->executors is a map from spout/bolt id to number of executors for that component
 (defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
 
-(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version])
+(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
 
 (defprotocol DaemonCommon
   (waiting? [this]))

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 70bd197..61aba6b 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -333,7 +333,7 @@
         supervisor-infos (all-supervisor-info storm-cluster-state nil)
 
         supervisor-details (dofor [[id info] supervisor-infos]
-                             (SupervisorDetails. id (:meta info)))
+                             (SupervisorDetails. id (:meta info) (:resources-map info)))
 
         ret (.allSlotsAvailableForScheduling inimbus
                      supervisor-details
@@ -568,7 +568,8 @@
                                                     all-ports (-> (get all-scheduling-slots sid)
                                                                   (set/difference dead-ports)
                                                                   ((fn [ports] (map int ports))))
-                                                    supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
+                                                    supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))
+                                                    ]]
                                           {sid supervisor-details}))]
     (merge all-supervisor-details
            (into {}
@@ -652,7 +653,7 @@
                                   (apply merge-with set/union))
 
         supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
-        cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
+        cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)
 
         ;; call scheduler.schedule to schedule all the topologies
         ;; the new assignments for all the topologies are in the cluster object.
@@ -701,7 +702,7 @@
   (let [infos (all-supervisor-info storm-cluster-state)]
     (->> infos
          (map (fn [[id info]]
-                 [id (SupervisorDetails. id (:hostname info) (:scheduler-meta info) nil)]))
+                 [id (SupervisorDetails. id (:hostname info) (:scheduler-meta info) nil (:resources-map info))]))
          (into {}))))
 
 (defn- to-worker-slot [[node port]]
@@ -1433,16 +1434,16 @@
               ;; TODO: need to get the port info about supervisors...
               ;; in standalone just look at metadata, otherwise just say N/A?
               supervisor-summaries (dofor [[id info] supervisor-infos]
-                                          (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
-
-                                            sup-sum (SupervisorSummary. (:hostname info)
-                                                                (:uptime-secs info)
-                                                                (count ports)
-                                                                (count (:used-ports info))
-                                                                id) ]
-                                            (when-let [version (:version info)] (.set_version sup-sum version))
-                                            sup-sum
-                                            ))
+                                     (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+                                           sup-sum (SupervisorSummary. (:hostname info)
+                                                     (:uptime-secs info)
+                                                     (count ports)
+                                                     (count (:used-ports info))
+                                                     id) ]
+                                       (.set_total_resources sup-sum (map-val double (:resources-map info)))
+                                       (when-let [version (:version info)] (.set_version sup-sum version))
+                                       sup-sum))
+              nimbus-uptime ((:uptime nimbus))
               bases (topology-bases storm-cluster-state)
               nimbuses (.nimbuses storm-cluster-state)
 

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index c78e865..3eed36e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -25,6 +25,7 @@
            [java.io File])
   (:use [backtype.storm config util log timer local-state])
   (:import [backtype.storm.utils VersionInfo])
+  (:import [backtype.storm Config])
   (:use [backtype.storm.daemon common])
   (:require [backtype.storm.daemon [worker :as worker]]
             [backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
@@ -495,6 +496,11 @@
       (.add processes-event-manager sync-processes)
       )))
 
+(defn mk-supervisor-capacities
+  [conf]
+  {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB))
+   Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
+
 ;; in local state, supervisor stores who its current assignments are
 ;; another thread launches events to restart any dead processes if necessary
 (defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
@@ -516,7 +522,8 @@
                                                  (.getMetadata isupervisor)
                                                  (conf SUPERVISOR-SCHEDULER-META)
                                                  ((:uptime supervisor))
-                                                 (:version supervisor))))]
+                                                 (:version supervisor)
+                                                 (mk-supervisor-capacities conf))))]
     (heartbeat-fn)
 
     ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index c805d03..26a6681 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -169,6 +169,15 @@ public class Config extends HashMap<String, Object> {
     public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
 
     /**
+     * What Network Topography detection classes should we use.
+     * Given a list of supervisor hostnames (or IP addresses), this class would return a list of
+     * rack names that correspond to the supervisors. This information is stored in Cluster.java, and
+     * is used in the resource aware scheduler.
+     */
+    public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
+    public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class;
+
+    /**
      * The hostname the supervisors/workers should report to nimbus. If unset, Storm will
      * get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
      *
@@ -993,6 +1002,22 @@ public class Config extends HashMap<String, Object> {
     public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
 
     /**
+     * The total amount of memory (in MiB) a supervisor is allowed to give to its workers.
+     *  A default value will be set for this config if user does not override
+     */
+    public static final String SUPERVISOR_MEMORY_CAPACITY_MB = "supervisor.memory.capacity.mb";
+    public static final Object SUPERVISOR_MEMORY_CAPACITY_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
+
+    /**
+     * The total amount of CPU resources a supervisor is allowed to give to its workers.
+     * By convention 1 cpu core should be about 100, but this can be adjusted if needed
+     * using 100 makes it simple to set the desired value to the capacity measurement
+     * for single threaded bolts.  A default value will be set for this config if user does not override
+     */
+    public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
+    public static final Object SUPERVISOR_CPU_CAPACITY_SCHEMA = ConfigValidation.PositiveNumberValidator;
+
+    /**
      * The jvm opts provided to workers launched by this supervisor. All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%"
      * and "%WORKER-PORT%" substrings are replaced with:
      * %ID%          -> port (for backward compatibility),
@@ -1133,6 +1158,34 @@ public class Config extends HashMap<String, Object> {
     public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
 
     /**
+     * The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler
+     * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override
+     */
+    public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb";
+    public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
+
+    /**
+     * The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler
+     * to allocate slots on machines with enough available memory.  A default value will be set for this config if user does not override
+     */
+    public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb";
+    public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
+
+    /**
+     * The config indicates the percentage of cpu for a core an instance(executor) of a component will use.
+     * Assuming the a core value to be 100, a value of 10 indicates 10% of the core.
+     * The P in PCORE represents the term "physical".  A default value will be set for this config if user does not override
+     */
+    public static final String TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT = "topology.component.cpu.pcore.percent";
+    public static final Object TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
+
+    /**
+     * A per topology config that specifies the maximum amount of memory a worker can use for that specific topology
+     */
+    public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb";
+    public static final Object TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
+
+    /**
      * How many executors to spawn for ackers.
      *
      * <p>By not setting this variable or setting it as null, Storm will set the number of acker executors
@@ -1755,4 +1808,14 @@ public class Config extends HashMap<String, Object> {
         conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
         return ret;
     }
+
+    /**
+     * set the max heap size allow per worker for this topology
+     * @param size
+     */
+    public void setTopologyWorkerMaxHeapSize(Number size) {
+        if(size != null) {
+            this.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, size);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index fd9dae7..e6c0986 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -261,6 +261,28 @@ public class ConfigValidation {
     };
 
     /**
+     * Validates a non negative double value.
+     */
+    public static Object NonNegativeNumberValidator = new FieldValidator() {
+        @Override
+        public void validateField(String name, Object o) throws IllegalArgumentException {
+            if (o == null) {
+                // A null value is acceptable.
+                return;
+            }
+            final double double_value;
+            if (o instanceof Number)
+            {
+                double_value = ((Number)o).doubleValue();
+                if (double_value >= 0.0) {
+                    return;
+                }
+            }
+            throw new IllegalArgumentException("Field " + name + " must be a non-negative double.");
+        }
+    };
+
+    /**
      * Validates a Positive Number
      */
     public static Object PositiveNumberValidator = new FieldValidator() {

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index a4ccf5f..8afbbc6 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -24,6 +24,7 @@ import java.util.regex.Pattern;
 import java.util.HashMap;
 import java.util.Map;
 
+import backtype.storm.scheduler.resource.ResourceUtils;
 import org.apache.commons.lang.StringUtils;
 import org.apache.thrift.TException;
 import org.json.simple.JSONValue;
@@ -185,9 +186,10 @@ public class StormSubmitter {
      * @throws AlreadyAliveException
      * @throws InvalidTopologyException
      * @throws AuthorizationException
+     * @throws IllegalArgumentException thrown if configs will yield an unschedulable topology. validateConfs validates confs
      */
     public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
-            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+            throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
         if(!Utils.isValidConf(stormConf)) {
             throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
         }
@@ -197,6 +199,8 @@ public class StormSubmitter {
         conf.putAll(stormConf);
         stormConf.putAll(prepareZookeeperAuthentication(conf));
 
+        validateConfs(conf, topology);
+
         Map<String,String> passedCreds = new HashMap<String, String>();
         if (opts != null) {
             Credentials tmpCreds = opts.get_creds();
@@ -442,4 +446,33 @@ public class StormSubmitter {
          */
         public void onCompleted(String srcFile, String targetFile, long totalBytes);
     }
+
+    private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
+        double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
+        Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
+        if(topologyWorkerMaxHeapSize < largestMemReq) {
+            throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+                    +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < " 
+                            + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
+        }
+    }
+
+    private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) {
+        double largestMemoryOperator = 0.0;
+        for(Map<String, Double> entry : ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
+            double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+                    + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            if(memoryRequirement > largestMemoryOperator) {
+                largestMemoryOperator = memoryRequirement;
+            }
+        }
+        for(Map<String, Double> entry : ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
+            double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+                    + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+            if(memoryRequirement > largestMemoryOperator) {
+                largestMemoryOperator = memoryRequirement;
+            }
+        }
+        return largestMemoryOperator;
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Assignment.java b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
index 7555b54..6405c05 100644
--- a/storm-core/src/jvm/backtype/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-28")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-6")
 public class Assignment implements org.apache.thrift.TBase<Assignment, Assignment._Fields>, java.io.Serializable, Cloneable, Comparable<Assignment> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Assignment");
 
@@ -678,15 +678,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 2: // NODE_HOST
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map504 = iprot.readMapBegin();
-                struct.node_host = new HashMap<String,String>(2*_map504.size);
-                String _key505;
-                String _val506;
-                for (int _i507 = 0; _i507 < _map504.size; ++_i507)
+                org.apache.thrift.protocol.TMap _map524 = iprot.readMapBegin();
+                struct.node_host = new HashMap<String,String>(2*_map524.size);
+                String _key525;
+                String _val526;
+                for (int _i527 = 0; _i527 < _map524.size; ++_i527)
                 {
-                  _key505 = iprot.readString();
-                  _val506 = iprot.readString();
-                  struct.node_host.put(_key505, _val506);
+                  _key525 = iprot.readString();
+                  _val526 = iprot.readString();
+                  struct.node_host.put(_key525, _val526);
                 }
                 iprot.readMapEnd();
               }
@@ -698,26 +698,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 3: // EXECUTOR_NODE_PORT
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map508 = iprot.readMapBegin();
-                struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map508.size);
-                List<Long> _key509;
-                NodeInfo _val510;
-                for (int _i511 = 0; _i511 < _map508.size; ++_i511)
+                org.apache.thrift.protocol.TMap _map528 = iprot.readMapBegin();
+                struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map528.size);
+                List<Long> _key529;
+                NodeInfo _val530;
+                for (int _i531 = 0; _i531 < _map528.size; ++_i531)
                 {
                   {
-                    org.apache.thrift.protocol.TList _list512 = iprot.readListBegin();
-                    _key509 = new ArrayList<Long>(_list512.size);
-                    long _elem513;
-                    for (int _i514 = 0; _i514 < _list512.size; ++_i514)
+                    org.apache.thrift.protocol.TList _list532 = iprot.readListBegin();
+                    _key529 = new ArrayList<Long>(_list532.size);
+                    long _elem533;
+                    for (int _i534 = 0; _i534 < _list532.size; ++_i534)
                     {
-                      _elem513 = iprot.readI64();
-                      _key509.add(_elem513);
+                      _elem533 = iprot.readI64();
+                      _key529.add(_elem533);
                     }
                     iprot.readListEnd();
                   }
-                  _val510 = new NodeInfo();
-                  _val510.read(iprot);
-                  struct.executor_node_port.put(_key509, _val510);
+                  _val530 = new NodeInfo();
+                  _val530.read(iprot);
+                  struct.executor_node_port.put(_key529, _val530);
                 }
                 iprot.readMapEnd();
               }
@@ -729,25 +729,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           case 4: // EXECUTOR_START_TIME_SECS
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map515 = iprot.readMapBegin();
-                struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map515.size);
-                List<Long> _key516;
-                long _val517;
-                for (int _i518 = 0; _i518 < _map515.size; ++_i518)
+                org.apache.thrift.protocol.TMap _map535 = iprot.readMapBegin();
+                struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map535.size);
+                List<Long> _key536;
+                long _val537;
+                for (int _i538 = 0; _i538 < _map535.size; ++_i538)
                 {
                   {
-                    org.apache.thrift.protocol.TList _list519 = iprot.readListBegin();
-                    _key516 = new ArrayList<Long>(_list519.size);
-                    long _elem520;
-                    for (int _i521 = 0; _i521 < _list519.size; ++_i521)
+                    org.apache.thrift.protocol.TList _list539 = iprot.readListBegin();
+                    _key536 = new ArrayList<Long>(_list539.size);
+                    long _elem540;
+                    for (int _i541 = 0; _i541 < _list539.size; ++_i541)
                     {
-                      _elem520 = iprot.readI64();
-                      _key516.add(_elem520);
+                      _elem540 = iprot.readI64();
+                      _key536.add(_elem540);
                     }
                     iprot.readListEnd();
                   }
-                  _val517 = iprot.readI64();
-                  struct.executor_start_time_secs.put(_key516, _val517);
+                  _val537 = iprot.readI64();
+                  struct.executor_start_time_secs.put(_key536, _val537);
                 }
                 iprot.readMapEnd();
               }
@@ -779,10 +779,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size()));
-            for (Map.Entry<String, String> _iter522 : struct.node_host.entrySet())
+            for (Map.Entry<String, String> _iter542 : struct.node_host.entrySet())
             {
-              oprot.writeString(_iter522.getKey());
-              oprot.writeString(_iter522.getValue());
+              oprot.writeString(_iter542.getKey());
+              oprot.writeString(_iter542.getValue());
             }
             oprot.writeMapEnd();
           }
@@ -794,17 +794,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size()));
-            for (Map.Entry<List<Long>, NodeInfo> _iter523 : struct.executor_node_port.entrySet())
+            for (Map.Entry<List<Long>, NodeInfo> _iter543 : struct.executor_node_port.entrySet())
             {
               {
-                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter523.getKey().size()));
-                for (long _iter524 : _iter523.getKey())
+                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter543.getKey().size()));
+                for (long _iter544 : _iter543.getKey())
                 {
-                  oprot.writeI64(_iter524);
+                  oprot.writeI64(_iter544);
                 }
                 oprot.writeListEnd();
               }
-              _iter523.getValue().write(oprot);
+              _iter543.getValue().write(oprot);
             }
             oprot.writeMapEnd();
           }
@@ -816,17 +816,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
           oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
           {
             oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size()));
-            for (Map.Entry<List<Long>, Long> _iter525 : struct.executor_start_time_secs.entrySet())
+            for (Map.Entry<List<Long>, Long> _iter545 : struct.executor_start_time_secs.entrySet())
             {
               {
-                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter525.getKey().size()));
-                for (long _iter526 : _iter525.getKey())
+                oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter545.getKey().size()));
+                for (long _iter546 : _iter545.getKey())
                 {
-                  oprot.writeI64(_iter526);
+                  oprot.writeI64(_iter546);
                 }
                 oprot.writeListEnd();
               }
-              oprot.writeI64(_iter525.getValue());
+              oprot.writeI64(_iter545.getValue());
             }
             oprot.writeMapEnd();
           }
@@ -865,42 +865,42 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       if (struct.is_set_node_host()) {
         {
           oprot.writeI32(struct.node_host.size());
-          for (Map.Entry<String, String> _iter527 : struct.node_host.entrySet())
+          for (Map.Entry<String, String> _iter547 : struct.node_host.entrySet())
           {
-            oprot.writeString(_iter527.getKey());
-            oprot.writeString(_iter527.getValue());
+            oprot.writeString(_iter547.getKey());
+            oprot.writeString(_iter547.getValue());
           }
         }
       }
       if (struct.is_set_executor_node_port()) {
         {
           oprot.writeI32(struct.executor_node_port.size());
-          for (Map.Entry<List<Long>, NodeInfo> _iter528 : struct.executor_node_port.entrySet())
+          for (Map.Entry<List<Long>, NodeInfo> _iter548 : struct.executor_node_port.entrySet())
           {
             {
-              oprot.writeI32(_iter528.getKey().size());
-              for (long _iter529 : _iter528.getKey())
+              oprot.writeI32(_iter548.getKey().size());
+              for (long _iter549 : _iter548.getKey())
               {
-                oprot.writeI64(_iter529);
+                oprot.writeI64(_iter549);
               }
             }
-            _iter528.getValue().write(oprot);
+            _iter548.getValue().write(oprot);
           }
         }
       }
       if (struct.is_set_executor_start_time_secs()) {
         {
           oprot.writeI32(struct.executor_start_time_secs.size());
-          for (Map.Entry<List<Long>, Long> _iter530 : struct.executor_start_time_secs.entrySet())
+          for (Map.Entry<List<Long>, Long> _iter550 : struct.executor_start_time_secs.entrySet())
           {
             {
-              oprot.writeI32(_iter530.getKey().size());
-              for (long _iter531 : _iter530.getKey())
+              oprot.writeI32(_iter550.getKey().size());
+              for (long _iter551 : _iter550.getKey())
               {
-                oprot.writeI64(_iter531);
+                oprot.writeI64(_iter551);
               }
             }
-            oprot.writeI64(_iter530.getValue());
+            oprot.writeI64(_iter550.getValue());
           }
         }
       }
@@ -914,64 +914,64 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
       BitSet incoming = iprot.readBitSet(3);
       if (incoming.get(0)) {
         {
-          org.apache.thrift.protocol.TMap _map532 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
-          struct.node_host = new HashMap<String,String>(2*_map532.size);
-          String _key533;
-          String _val534;
-          for (int _i535 = 0; _i535 < _map532.size; ++_i535)
+          org.apache.thrift.protocol.TMap _map552 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+          struct.node_host = new HashMap<String,String>(2*_map552.size);
+          String _key553;
+          String _val554;
+          for (int _i555 = 0; _i555 < _map552.size; ++_i555)
           {
-            _key533 = iprot.readString();
-            _val534 = iprot.readString();
-            struct.node_host.put(_key533, _val534);
+            _key553 = iprot.readString();
+            _val554 = iprot.readString();
+            struct.node_host.put(_key553, _val554);
           }
         }
         struct.set_node_host_isSet(true);
       }
       if (incoming.get(1)) {
         {
-          org.apache.thrift.protocol.TMap _map536 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
-          struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map536.size);
-          List<Long> _key537;
-          NodeInfo _val538;
-          for (int _i539 = 0; _i539 < _map536.size; ++_i539)
+          org.apache.thrift.protocol.TMap _map556 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+          struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map556.size);
+          List<Long> _key557;
+          NodeInfo _val558;
+          for (int _i559 = 0; _i559 < _map556.size; ++_i559)
           {
             {
-              org.apache.thrift.protocol.TList _list540 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-              _key537 = new ArrayList<Long>(_list540.size);
-              long _elem541;
-              for (int _i542 = 0; _i542 < _list540.size; ++_i542)
+              org.apache.thrift.protocol.TList _list560 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+              _key557 = new ArrayList<Long>(_list560.size);
+              long _elem561;
+              for (int _i562 = 0; _i562 < _list560.size; ++_i562)
               {
-                _elem541 = iprot.readI64();
-                _key537.add(_elem541);
+                _elem561 = iprot.readI64();
+                _key557.add(_elem561);
               }
             }
-            _val538 = new NodeInfo();
-            _val538.read(iprot);
-            struct.executor_node_port.put(_key537, _val538);
+            _val558 = new NodeInfo();
+            _val558.read(iprot);
+            struct.executor_node_port.put(_key557, _val558);
           }
         }
         struct.set_executor_node_port_isSet(true);
       }
       if (incoming.get(2)) {
         {
-          org.apache.thrift.protocol.TMap _map543 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
-          struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map543.size);
-          List<Long> _key544;
-          long _val545;
-          for (int _i546 = 0; _i546 < _map543.size; ++_i546)
+          org.apache.thrift.protocol.TMap _map563 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+          struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map563.size);
+          List<Long> _key564;
+          long _val565;
+          for (int _i566 = 0; _i566 < _map563.size; ++_i566)
           {
             {
-              org.apache.thrift.protocol.TList _list547 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
-              _key544 = new ArrayList<Long>(_list547.size);
-              long _elem548;
-              for (int _i549 = 0; _i549 < _list547.size; ++_i549)
+              org.apache.thrift.protocol.TList _list567 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+              _key564 = new ArrayList<Long>(_list567.size);
+              long _elem568;
+              for (int _i569 = 0; _i569 < _list567.size; ++_i569)
               {
-                _elem548 = iprot.readI64();
-                _key544.add(_elem548);
+                _elem568 = iprot.readI64();
+                _key564.add(_elem568);
               }
             }
-            _val545 = iprot.readI64();
-            struct.executor_start_time_secs.put(_key544, _val545);
+            _val565 = iprot.readI64();
+            struct.executor_start_time_secs.put(_key564, _val565);
           }
         }
         struct.set_executor_start_time_secs_isSet(true);

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
index 3234d2b..0ddcaba 100644
--- a/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-5-22")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-6")
 public class BoltAggregateStats implements org.apache.thrift.TBase<BoltAggregateStats, BoltAggregateStats._Fields>, java.io.Serializable, Cloneable, Comparable<BoltAggregateStats> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BoltAggregateStats");
 

http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/BoltStats.java b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
index 26ef5d8..315f955 100644
--- a/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-3")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-30")
 public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._Fields>, java.io.Serializable, Cloneable, Comparable<BoltStats> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BoltStats");
 
@@ -881,28 +881,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
           case 1: // ACKED
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map98 = iprot.readMapBegin();
-                struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map98.size);
-                String _key99;
-                Map<GlobalStreamId,Long> _val100;
-                for (int _i101 = 0; _i101 < _map98.size; ++_i101)
+                org.apache.thrift.protocol.TMap _map108 = iprot.readMapBegin();
+                struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map108.size);
+                String _key109;
+                Map<GlobalStreamId,Long> _val110;
+                for (int _i111 = 0; _i111 < _map108.size; ++_i111)
                 {
-                  _key99 = iprot.readString();
+                  _key109 = iprot.readString();
                   {
-                    org.apache.thrift.protocol.TMap _map102 = iprot.readMapBegin();
-                    _val100 = new HashMap<GlobalStreamId,Long>(2*_map102.size);
-                    GlobalStreamId _key103;
-                    long _val104;
-                    for (int _i105 = 0; _i105 < _map102.size; ++_i105)
+                    org.apache.thrift.protocol.TMap _map112 = iprot.readMapBegin();
+                    _val110 = new HashMap<GlobalStreamId,Long>(2*_map112.size);
+                    GlobalStreamId _key113;
+                    long _val114;
+                    for (int _i115 = 0; _i115 < _map112.size; ++_i115)
                     {
-                      _key103 = new GlobalStreamId();
-                      _key103.read(iprot);
-                      _val104 = iprot.readI64();
-                      _val100.put(_key103, _val104);
+                      _key113 = new GlobalStreamId();
+                      _key113.read(iprot);
+                      _val114 = iprot.readI64();
+                      _val110.put(_key113, _val114);
                     }
                     iprot.readMapEnd();
                   }
-                  struct.acked.put(_key99, _val100);
+                  struct.acked.put(_key109, _val110);
                 }
                 iprot.readMapEnd();
               }
@@ -914,28 +914,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
           case 2: // FAILED
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map106 = iprot.readMapBegin();
-                struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map106.size);
-                String _key107;
-                Map<GlobalStreamId,Long> _val108;
-                for (int _i109 = 0; _i109 < _map106.size; ++_i109)
+                org.apache.thrift.protocol.TMap _map116 = iprot.readMapBegin();
+                struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map116.size);
+                String _key117;
+                Map<GlobalStreamId,Long> _val118;
+                for (int _i119 = 0; _i119 < _map116.size; ++_i119)
                 {
-                  _key107 = iprot.readString();
+                  _key117 = iprot.readString();
                   {
-                    org.apache.thrift.protocol.TMap _map110 = iprot.readMapBegin();
-                    _val108 = new HashMap<GlobalStreamId,Long>(2*_map110.size);
-                    GlobalStreamId _key111;
-                    long _val112;
-                    for (int _i113 = 0; _i113 < _map110.size; ++_i113)
+                    org.apache.thrift.protocol.TMap _map120 = iprot.readMapBegin();
+                    _val118 = new HashMap<GlobalStreamId,Long>(2*_map120.size);
+                    GlobalStreamId _key121;
+                    long _val122;
+                    for (int _i123 = 0; _i123 < _map120.size; ++_i123)
                     {
-                      _key111 = new GlobalStreamId();
-                      _key111.read(iprot);
-                      _val112 = iprot.readI64();
-                      _val108.put(_key111, _val112);
+                      _key121 = new GlobalStreamId();
+                      _key121.read(iprot);
+                      _val122 = iprot.readI64();
+                      _val118.put(_key121, _val122);
                     }
                     iprot.readMapEnd();
                   }
-                  struct.failed.put(_key107, _val108);
+                  struct.failed.put(_key117, _val118);
                 }
                 iprot.readMapEnd();
               }
@@ -947,28 +947,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
           case 3: // PROCESS_MS_AVG
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map114 = iprot.readMapBegin();
-                struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map114.size);
-                String _key115;
-                Map<GlobalStreamId,Double> _val116;
-                for (int _i117 = 0; _i117 < _map114.size; ++_i117)
+                org.apache.thrift.protocol.TMap _map124 = iprot.readMapBegin();
+                struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map124.size);
+                String _key125;
+                Map<GlobalStreamId,Double> _val126;
+                for (int _i127 = 0; _i127 < _map124.size; ++_i127)
                 {
-                  _key115 = iprot.readString();
+                  _key125 = iprot.readString();
                   {
-                    org.apache.thrift.protocol.TMap _map118 = iprot.readMapBegin();
-                    _val116 = new HashMap<GlobalStreamId,Double>(2*_map118.size);
-                    GlobalStreamId _key119;
-                    double _val120;
-                    for (int _i121 = 0; _i121 < _map118.size; ++_i121)
+                    org.apache.thrift.protocol.TMap _map128 = iprot.readMapBegin();
+                    _val126 = new HashMap<GlobalStreamId,Double>(2*_map128.size);
+                    GlobalStreamId _key129;
+                    double _val130;
+                    for (int _i131 = 0; _i131 < _map128.size; ++_i131)
                     {
-                      _key119 = new GlobalStreamId();
-                      _key119.read(iprot);
-                      _val120 = iprot.readDouble();
-                      _val116.put(_key119, _val120);
+                      _key129 = new GlobalStreamId();
+                      _key129.read(iprot);
+                      _val130 = iprot.readDouble();
+                      _val126.put(_key129, _val130);
                     }
                     iprot.readMapEnd();
                   }
-                  struct.process_ms_avg.put(_key115, _val116);
+                  struct.process_ms_avg.put(_key125, _val126);
                 }
                 iprot.readMapEnd();
               }
@@ -980,28 +980,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
           case 4: // EXECUTED
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map122 = iprot.readMapBegin();
-                struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map122.size);
-                String _key123;
-                Map<GlobalStreamId,Long> _val124;
-                for (int _i125 = 0; _i125 < _map122.size; ++_i125)
+                org.apache.thrift.protocol.TMap _map132 = iprot.readMapBegin();
+                struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map132.size);
+                String _key133;
+                Map<GlobalStreamId,Long> _val134;
+                for (int _i135 = 0; _i135 < _map132.size; ++_i135)
                 {
-                  _key123 = iprot.readString();
+                  _key133 = iprot.readString();
                   {
-                    org.apache.thrift.protocol.TMap _map126 = iprot.readMapBegin();
-                    _val124 = new HashMap<GlobalStreamId,Long>(2*_map126.size);
-                    GlobalStreamId _key127;
-                    long _val128;
-                    for (int _i129 = 0; _i129 < _map126.size; ++_i129)
+                    org.apache.thrift.protocol.TMap _map136 = iprot.readMapBegin();
+                    _val134 = new HashMap<GlobalStreamId,Long>(2*_map136.size);
+                    GlobalStreamId _key137;
+                    long _val138;
+                    for (int _i139 = 0; _i139 < _map136.size; ++_i139)
                     {
-                      _key127 = new GlobalStreamId();
-                      _key127.read(iprot);
-                      _val128 = iprot.readI64();
-                      _val124.put(_key127, _val128);
+                      _key137 = new GlobalStreamId();
+                      _key137.read(iprot);
+                      _val138 = iprot.readI64();
+                      _val134.put(_key137, _val138);
                     }
                     iprot.readMapEnd();
                   }
-                  struct.executed.put(_key123, _val124);
+                  struct.executed.put(_key133, _val134);
                 }
                 iprot.readMapEnd();
               }
@@ -1013,28 +1013,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
           case 5: // EXECUTE_MS_AVG
             if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
               {
-                org.apache.thrift.protocol.TMap _map130 = iprot.readMapBegin();
-                struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map130.size);
-                String _key131;
-                Map<GlobalStreamId,Double> _val132;
-                for (int _i133 = 0; _i133 < _map130.size; ++_i133)
+                org.apache.thrift.protocol.TMap _map140 = iprot.readMapBegin();
+                struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map140.size);
+                String _key141;
+                Map<GlobalStreamId,Double> _val142;
+                for (int _i143 = 0; _i143 < _map140.size; ++_i143)
                 {
-                  _key131 = iprot.readString();
+                  _key141 = iprot.readString();
                   {
-                    org.apache.thrift.protocol.TMap _map134 = iprot.readMapBegin();
-                    _val132 = new HashMap<GlobalStreamId,Double>(2*_map134.size);
-                    GlobalStreamId _key135;
-                    double _val136;
-                    for (int _i137 = 0; _i137 < _map134.size; ++_i137)
+                    org.apache.thrift.protocol.TMap _map144 = iprot.readMapBegin();
+                    _val142 = new HashMap<GlobalStreamId,Double>(2*_map144.size);
+                    GlobalStreamId _key145;
+                    double _val146;
+                    for (int _i147 = 0; _i147 < _map144.size; ++_i147)
                     {
-                      _key135 = new GlobalStreamId();
-                      _key135.read(iprot);
-                      _val136 = iprot.readDouble();
-                      _val132.put(_key135, _val136);
+                      _key145 = new GlobalStreamId();
+                      _key145.read(iprot);
+                      _val146 = iprot.readDouble();
+                      _val142.put(_key145, _val146);
                     }
                     iprot.readMapEnd();
                   }
-                  struct.execute_ms_avg.put(_key131, _val132);
+                  struct.execute_ms_avg.put(_key141, _val142);
                 }
                 iprot.readMapEnd();
               }
@@ -1060,15 +1060,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
         oprot.writeFieldBegin(ACKED_FIELD_DESC);
         {
           oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.acked.size()));
-          for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter138 : struct.acked.entrySet())
+          for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter148 : struct.acked.entrySet())
           {
-            oprot.writeString(_iter138.getKey());
+            oprot.writeString(_iter148.getKey());
             {
-              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter138.getValue().size()));
-              for (Map.Entry<GlobalStreamId, Long> _iter139 : _iter138.getValue().entrySet())
+              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter148.getValue().size()));
+              for (Map.Entry<GlobalStreamId, Long> _iter149 : _iter148.getValue().entrySet())
               {
-                _iter139.getKey().write(oprot);
-                oprot.writeI64(_iter139.getValue());
+                _iter149.getKey().write(oprot);
+                oprot.writeI64(_iter149.getValue());
               }
               oprot.writeMapEnd();
             }
@@ -1081,15 +1081,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
         oprot.writeFieldBegin(FAILED_FIELD_DESC);
         {
           oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.failed.size()));
-          for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter140 : struct.failed.entrySet())
+          for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter150 : struct.failed.entrySet())
           {
-            oprot.writeString(_iter140.getKey());
+            oprot.writeString(_iter150.getKey());
             {
-              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter140.getValue().size()));
-              for (Map.Entry<GlobalStreamId, Long> _iter141 : _iter140.getValue().entrySet())
+              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter150.getValue().size()));
+              for (Map.Entry<GlobalStreamId, Long> _iter151 : _iter150.getValue().entrySet())
               {
-                _iter141.getKey().write(oprot);
-                oprot.writeI64(_iter141.getValue());
+                _iter151.getKey().write(oprot);
+                oprot.writeI64(_iter151.getValue());
               }
               oprot.writeMapEnd();
             }
@@ -1102,15 +1102,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
         oprot.writeFieldBegin(PROCESS_MS_AVG_FIELD_DESC);
         {
           oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.process_ms_avg.size()));
-          for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter142 : struct.process_ms_avg.entrySet())
+          for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter152 : struct.process_ms_avg.entrySet())
           {
-            oprot.writeString(_iter142.getKey());
+            oprot.writeString(_iter152.getKey());
             {
-              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter142.getValue().size()));
-              for (Map.Entry<GlobalStreamId, Double> _iter143 : _iter142.getValue().entrySet())
+              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter152.getValue().size()));
+              for (Map.Entry<GlobalStreamId, Double> _iter153 : _iter152.getValue().entrySet())
               {
-                _iter143.getKey().write(oprot);
-                oprot.writeDouble(_iter143.getValue());
+                _iter153.getKey().write(oprot);
+                oprot.writeDouble(_iter153.getValue());
               }
               oprot.writeMapEnd();
             }
@@ -1123,15 +1123,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
         oprot.writeFieldBegin(EXECUTED_FIELD_DESC);
         {
           oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.executed.size()));
-          for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter144 : struct.executed.entrySet())
+          for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter154 : struct.executed.entrySet())
           {
-            oprot.writeString(_iter144.getKey());
+            oprot.writeString(_iter154.getKey());
             {
-              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter144.getValue().size()));
-              for (Map.Entry<GlobalStreamId, Long> _iter145 : _iter144.getValue().entrySet())
+              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter154.getValue().size()));
+              for (Map.Entry<GlobalStreamId, Long> _iter155 : _iter154.getValue().entrySet())
               {
-                _iter145.getKey().write(oprot);
-                oprot.writeI64(_iter145.getValue());
+                _iter155.getKey().write(oprot);
+                oprot.writeI64(_iter155.getValue());
               }
               oprot.writeMapEnd();
             }
@@ -1144,15 +1144,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
         oprot.writeFieldBegin(EXECUTE_MS_AVG_FIELD_DESC);
         {
           oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.execute_ms_avg.size()));
-          for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter146 : struct.execute_ms_avg.entrySet())
+          for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter156 : struct.execute_ms_avg.entrySet())
           {
-            oprot.writeString(_iter146.getKey());
+            oprot.writeString(_iter156.getKey());
             {
-              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter146.getValue().size()));
-              for (Map.Entry<GlobalStreamId, Double> _iter147 : _iter146.getValue().entrySet())
+              oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter156.getValue().size()));
+              for (Map.Entry<GlobalStreamId, Double> _iter157 : _iter156.getValue().entrySet())
               {
-                _iter147.getKey().write(oprot);
-                oprot.writeDouble(_iter147.getValue());
+                _iter157.getKey().write(oprot);
+                oprot.writeDouble(_iter157.getValue());
               }
               oprot.writeMapEnd();
             }
@@ -1180,75 +1180,75 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
       TTupleProtocol oprot = (TTupleProtocol) prot;
       {
         oprot.writeI32(struct.acked.size());
-        for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter148 : struct.acked.entrySet())
+        for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter158 : struct.acked.entrySet())
         {
-          oprot.writeString(_iter148.getKey());
+          oprot.writeString(_iter158.getKey());
           {
-            oprot.writeI32(_iter148.getValue().size());
-            for (Map.Entry<GlobalStreamId, Long> _iter149 : _iter148.getValue().entrySet())
+            oprot.writeI32(_iter158.getValue().size());
+            for (Map.Entry<GlobalStreamId, Long> _iter159 : _iter158.getValue().entrySet())
             {
-              _iter149.getKey().write(oprot);
-              oprot.writeI64(_iter149.getValue());
+              _iter159.getKey().write(oprot);
+              oprot.writeI64(_iter159.getValue());
             }
           }
         }
       }
       {
         oprot.writeI32(struct.failed.size());
-        for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter150 : struct.failed.entrySet())
+        for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter160 : struct.failed.entrySet())
         {
-          oprot.writeString(_iter150.getKey());
+          oprot.writeString(_iter160.getKey());
           {
-            oprot.writeI32(_iter150.getValue().size());
-            for (Map.Entry<GlobalStreamId, Long> _iter151 : _iter150.getValue().entrySet())
+            oprot.writeI32(_iter160.getValue().size());
+            for (Map.Entry<GlobalStreamId, Long> _iter161 : _iter160.getValue().entrySet())
             {
-              _iter151.getKey().write(oprot);
-              oprot.writeI64(_iter151.getValue());
+              _iter161.getKey().write(oprot);
+              oprot.writeI64(_iter161.getValue());
             }
           }
         }
       }
       {
         oprot.writeI32(struct.process_ms_avg.size());
-        for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter152 : struct.process_ms_avg.entrySet())
+        for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter162 : struct.process_ms_avg.entrySet())
         {
-          oprot.writeString(_iter152.getKey());
+          oprot.writeString(_iter162.getKey());
           {
-            oprot.writeI32(_iter152.getValue().size());
-            for (Map.Entry<GlobalStreamId, Double> _iter153 : _iter152.getValue().entrySet())
+            oprot.writeI32(_iter162.getValue().size());
+            for (Map.Entry<GlobalStreamId, Double> _iter163 : _iter162.getValue().entrySet())
             {
-              _iter153.getKey().write(oprot);
-              oprot.writeDouble(_iter153.getValue());
+              _iter163.getKey().write(oprot);
+              oprot.writeDouble(_iter163.getValue());
             }
           }
         }
       }
       {
         oprot.writeI32(struct.executed.size());
-        for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter154 : struct.executed.entrySet())
+        for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter164 : struct.executed.entrySet())
         {
-          oprot.writeString(_iter154.getKey());
+          oprot.writeString(_iter164.getKey());
           {
-            oprot.writeI32(_iter154.getValue().size());
-            for (Map.Entry<GlobalStreamId, Long> _iter155 : _iter154.getValue().entrySet())
+            oprot.writeI32(_iter164.getValue().size());
+            for (Map.Entry<GlobalStreamId, Long> _iter165 : _iter164.getValue().entrySet())
             {
-              _iter155.getKey().write(oprot);
-              oprot.writeI64(_iter155.getValue());
+              _iter165.getKey().write(oprot);
+              oprot.writeI64(_iter165.getValue());
             }
           }
         }
       }
       {
         oprot.writeI32(struct.execute_ms_avg.size());
-        for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter156 : struct.execute_ms_avg.entrySet())
+        for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter166 : struct.execute_ms_avg.entrySet())
         {
-          oprot.writeString(_iter156.getKey());
+          oprot.writeString(_iter166.getKey());
           {
-            oprot.writeI32(_iter156.getValue().size());
-            for (Map.Entry<GlobalStreamId, Double> _iter157 : _iter156.getValue().entrySet())
+            oprot.writeI32(_iter166.getValue().size());
+            for (Map.Entry<GlobalStreamId, Double> _iter167 : _iter166.getValue().entrySet())
             {
-              _iter157.getKey().write(oprot);
-              oprot.writeDouble(_iter157.getValue());
+              _iter167.getKey().write(oprot);
+              oprot.writeDouble(_iter167.getValue());
             }
           }
         }
@@ -1259,127 +1259,127 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
     public void read(org.apache.thrift.protocol.TProtocol prot, BoltStats struct) throws org.apache.thrift.TException {
       TTupleProtocol iprot = (TTupleProtocol) prot;
       {
-        org.apache.thrift.protocol.TMap _map158 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
-        struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map158.size);
-        String _key159;
-        Map<GlobalStreamId,Long> _val160;
-        for (int _i161 = 0; _i161 < _map158.size; ++_i161)
+        org.apache.thrift.protocol.TMap _map168 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+        struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map168.size);
+        String _key169;
+        Map<GlobalStreamId,Long> _val170;
+        for (int _i171 = 0; _i171 < _map168.size; ++_i171)
         {
-          _key159 = iprot.readString();
+          _key169 = iprot.readString();
           {
-            org.apache.thrift.protocol.TMap _map162 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
-            _val160 = new HashMap<GlobalStreamId,Long>(2*_map162.size);
-            GlobalStreamId _key163;
-            long _val164;
-            for (int _i165 = 0; _i165 < _map162.size; ++_i165)
+            org.apache.thrift.protocol.TMap _map172 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+            _val170 = new HashMap<GlobalStreamId,Long>(2*_map172.size);
+            GlobalStreamId _key173;
+            long _val174;
+            for (int _i175 = 0; _i175 < _map172.size; ++_i175)
             {
-              _key163 = new GlobalStreamId();
-              _key163.read(iprot);
-              _val164 = iprot.readI64();
-              _val160.put(_key163, _val164);
+              _key173 = new GlobalStreamId();
+              _key173.read(iprot);
+              _val174 = iprot.readI64();
+              _val170.put(_key173, _val174);
             }
           }
-          struct.acked.put(_key159, _val160);
+          struct.acked.put(_key169, _val170);
         }
       }
       struct.set_acked_isSet(true);
       {
-        org.apache.thrift.protocol.TMap _map166 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
-        struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map166.size);
-        String _key167;
-        Map<GlobalStreamId,Long> _val168;
-        for (int _i169 = 0; _i169 < _map166.size; ++_i169)
+        org.apache.thrift.protocol.TMap _map176 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+        struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map176.size);
+        String _key177;
+        Map<GlobalStreamId,Long> _val178;
+        for (int _i179 = 0; _i179 < _map176.size; ++_i179)
         {
-          _key167 = iprot.readString();
+          _key177 = iprot.readString();
           {
-            org.apache.thrift.protocol.TMap _map170 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
-            _val168 = new HashMap<GlobalStreamId,Long>(2*_map170.size);
-            GlobalStreamId _key171;
-            long _val172;
-            for (int _i173 = 0; _i173 < _map170.size; ++_i173)
+            org.apache.thrift.protocol.TMap _map180 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+            _val178 = new HashMap<GlobalStreamId,Long>(2*_map180.size);
+            GlobalStreamId _key181;
+            long _val182;
+            for (int _i183 = 0; _i183 < _map180.size; ++_i183)
             {
-              _key171 = new GlobalStreamId();
-              _key171.read(iprot);
-              _val172 = iprot.readI64();
-              _val168.put(_key171, _val172);
+              _key181 = new GlobalStreamId();
+              _key181.read(iprot);
+              _val182 = iprot.readI64();
+              _val178.put(_key181, _val182);
             }
           }
-          struct.failed.put(_key167, _val168);
+          struct.failed.put(_key177, _val178);
         }
       }
       struct.set_failed_isSet(true);
       {
-        org.apache.thrift.protocol.TMap _map174 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
-        struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map174.size);
-        String _key175;
-        Map<GlobalStreamId,Double> _val176;
-        for (int _i177 = 0; _i177 < _map174.size; ++_i177)
+        org.apache.thrift.protocol.TMap _map184 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+        struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map184.size);
+        String _key185;
+        Map<GlobalStreamId,Double> _val186;
+        for (int _i187 = 0; _i187 < _map184.size; ++_i187)
         {
-          _key175 = iprot.readString();
+          _key185 = iprot.readString();
           {
-            org.apache.thrift.protocol.TMap _map178 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
-            _val176 = new HashMap<GlobalStreamId,Double>(2*_map178.size);
-            GlobalStreamId _key179;
-            double _val180;
-            for (int _i181 = 0; _i181 < _map178.size; ++_i181)
+            org.apache.thrift.protocol.TMap _map188 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
+            _val186 = new HashMap<GlobalStreamId,Double>(2*_map188.size);
+            GlobalStreamId _key189;
+            double _val190;
+            for (int _i191 = 0; _i191 < _map188.size; ++_i191)
             {
-              _key179 = new GlobalStreamId();
-              _key179.read(iprot);
-              _val180 = iprot.readDouble();
-              _val176.put(_key179, _val180);
+              _key189 = new GlobalStreamId();
+              _key189.read(iprot);
+              _val190 = iprot.readDouble();
+              _val186.put(_key189, _val190);
             }
           }
-          struct.process_ms_avg.put(_key175, _val176);
+          struct.process_ms_avg.put(_key185, _val186);
         }
       }
       struct.set_process_ms_avg_isSet(true);
       {
-        org.apache.thrift.protocol.TMap _map182 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
-        struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map182.size);
-        String _key183;
-        Map<GlobalStreamId,Long> _val184;
-        for (int _i185 = 0; _i185 < _map182.size; ++_i185)
+        org.apache.thrift.protocol.TMap _map192 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+        struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map192.size);
+        String _key193;
+        Map<GlobalStreamId,Long> _val194;
+        for (int _i195 = 0; _i195 < _map192.size; ++_i195)
         {
-          _key183 = iprot.readString();
+          _key193 = iprot.readString();
           {
-            org.apache.thrift.protocol.TMap _map186 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
-            _val184 = new HashMap<GlobalStreamId,Long>(2*_map186.size);
-            GlobalStreamId _key187;
-            long _val188;
-            for (int _i189 = 0; _i189 < _map186.size; ++_i189)
+            org.apache.thrift.protocol.TMap _map196 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+            _val194 = new HashMap<GlobalStreamId,Long>(2*_map196.size);
+            GlobalStreamId _key197;
+            long _val198;
+            for (int _i199 = 0; _i199 < _map196.size; ++_i199)
             {
-              _key187 = new GlobalStreamId();
-              _key187.read(iprot);
-              _val188 = iprot.readI64();
-              _val184.put(_key187, _val188);
+              _key197 = new GlobalStreamId();
+              _key197.read(iprot);
+              _val198 = iprot.readI64();
+              _val194.put(_key197, _val198);
             }
           }
-          struct.executed.put(_key183, _val184);
+          struct.executed.put(_key193, _val194);
         }
       }
       struct.set_executed_isSet(true);
       {
-        org.apache.thrift.protocol.TMap _map190 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
-        struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map190.size);
-        String _key191;
-        Map<GlobalStreamId,Double> _val192;
-        for (int _i193 = 0; _i193 < _map190.size; ++_i193)
+        org.apache.thrift.protocol.TMap _map200 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+        struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map200.size);
+        String _key201;
+        Map<GlobalStreamId,Double> _val202;
+        for (int _i203 = 0; _i203 < _map200.size; ++_i203)
         {
-          _key191 = iprot.readString();
+          _key201 = iprot.readString();
           {
-            org.apache.thrift.protocol.TMap _map194 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
-            _val192 = new HashMap<GlobalStreamId,Double>(2*_map194.size);
-            GlobalStreamId _key195;
-            double _val196;
-            for (int _i197 = 0; _i197 < _map194.size; ++_i197)
+            org.apache.thrift.protocol.TMap _map204 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
+            _val202 = new HashMap<GlobalStreamId,Double>(2*_map204.size);
+            GlobalStreamId _key205;
+            double _val206;
+            for (int _i207 = 0; _i207 < _map204.size; ++_i207)
             {
-              _key195 = new GlobalStreamId();
-              _key195.read(iprot);
-              _val196 = iprot.readDouble();
-              _val192.put(_key195, _val196);
+              _key205 = new GlobalStreamId();
+              _key205.read(iprot);
+              _val206 = iprot.readDouble();
+              _val202.put(_key205, _val206);
             }
           }
-          struct.execute_ms_avg.put(_key191, _val192);
+          struct.execute_ms_avg.put(_key201, _val202);
         }
       }
       struct.set_execute_ms_avg_isSet(true);