You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2015/10/09 16:53:35 UTC
[7/9] storm git commit: [STORM-893] - Resource Aware Scheduler
implementation. [STORM-894] - Basic Resource Aware Scheduling implementation.
[STORM-893] - Resource Aware Scheduler implementation.
[STORM-894] - Basic Resource Aware Scheduling implementation.
Added functionality for users to limit the amount of memory resources allocated to a worker (JVM) process when scheduling with resource aware scheduler. This allows users to potentially spread executors more evenly across workers.
Also refactored some code
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/121d022b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/121d022b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/121d022b
Branch: refs/heads/master
Commit: 121d022b9f11146f6dadc2cd402c747472cac0d1
Parents: 1822491
Author: Boyang Jerry Peng <je...@yahoo-inc.com>
Authored: Tue Oct 6 22:23:18 2015 -0500
Committer: Boyang Jerry Peng <je...@yahoo-inc.com>
Committed: Fri Oct 9 08:44:20 2015 -0500
----------------------------------------------------------------------
conf/defaults.yaml | 14 +
.../starter/ResourceAwareExampleTopology.java | 87 ++
storm-core/src/clj/backtype/storm/converter.clj | 4 +-
.../src/clj/backtype/storm/daemon/common.clj | 2 +-
.../src/clj/backtype/storm/daemon/nimbus.clj | 29 +-
.../clj/backtype/storm/daemon/supervisor.clj | 9 +-
storm-core/src/jvm/backtype/storm/Config.java | 63 ++
.../jvm/backtype/storm/ConfigValidation.java | 22 +
.../src/jvm/backtype/storm/StormSubmitter.java | 35 +-
.../backtype/storm/generated/Assignment.java | 194 ++--
.../storm/generated/BoltAggregateStats.java | 2 +-
.../jvm/backtype/storm/generated/BoltStats.java | 442 ++++-----
.../storm/generated/ClusterSummary.java | 110 +-
.../storm/generated/ClusterWorkerHeartbeat.java | 54 +-
.../storm/generated/CommonAggregateStats.java | 2 +-
.../generated/ComponentAggregateStats.java | 2 +-
.../storm/generated/ComponentPageInfo.java | 222 ++---
.../backtype/storm/generated/Credentials.java | 46 +-
.../storm/generated/ExecutorAggregateStats.java | 2 +-
.../backtype/storm/generated/ExecutorStats.java | 170 ++--
.../storm/generated/LSApprovedWorkers.java | 46 +-
.../generated/LSSupervisorAssignments.java | 50 +-
.../storm/generated/LSWorkerHeartbeat.java | 38 +-
.../storm/generated/LocalAssignment.java | 38 +-
.../storm/generated/LocalStateData.java | 50 +-
.../jvm/backtype/storm/generated/LogConfig.java | 50 +-
.../jvm/backtype/storm/generated/LogLevel.java | 2 +-
.../jvm/backtype/storm/generated/Nimbus.java | 2 +-
.../jvm/backtype/storm/generated/NodeInfo.java | 34 +-
.../storm/generated/RebalanceOptions.java | 46 +-
.../storm/generated/SpoutAggregateStats.java | 2 +-
.../backtype/storm/generated/SpoutStats.java | 254 ++---
.../jvm/backtype/storm/generated/StormBase.java | 94 +-
.../storm/generated/SupervisorInfo.java | 276 +++--
.../storm/generated/SupervisorSummary.java | 168 +++-
.../backtype/storm/generated/TopologyInfo.java | 162 +--
.../storm/generated/TopologyPageInfo.java | 98 +-
.../backtype/storm/generated/TopologyStats.java | 222 ++---
.../AbstractDNSToSwitchMapping.java | 95 ++
.../networktopography/DNSToSwitchMapping.java | 50 +
.../DefaultRackDNSToSwitchMapping.java | 35 +
.../jvm/backtype/storm/scheduler/Cluster.java | 46 +-
.../storm/scheduler/SupervisorDetails.java | 61 +-
.../backtype/storm/scheduler/Topologies.java | 15 +-
.../storm/scheduler/TopologyDetails.java | 330 +++++-
.../storm/scheduler/resource/Component.java | 54 +
.../storm/scheduler/resource/RAS_Node.java | 547 ++++++++++
.../resource/ResourceAwareScheduler.java | 149 +++
.../storm/scheduler/resource/ResourceUtils.java | 133 +++
.../resource/strategies/IStrategy.java | 37 +
.../strategies/ResourceAwareStrategy.java | 480 +++++++++
.../AlternateRackDNSToSwitchMapping.java | 48 +
.../topology/BaseConfigurationDeclarer.java | 31 +-
.../ComponentConfigurationDeclarer.java | 3 +
.../src/jvm/backtype/storm/utils/Utils.java | 19 +
storm-core/src/py/storm/ttypes.py | 994 ++++++++++---------
storm-core/src/storm.thrift | 2 +
.../test/clj/backtype/storm/cluster_test.clj | 4 +-
.../scheduler/multitenant_scheduler_test.clj | 34 +-
.../scheduler/resource_aware_scheduler_test.clj | 669 +++++++++++++
.../test/clj/backtype/storm/scheduler_test.clj | 3 +-
61 files changed, 5161 insertions(+), 1821 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 4a8e354..a41d805 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -132,6 +132,11 @@ supervisor.heartbeat.frequency.secs: 5
supervisor.enable: true
supervisor.supervisors: []
supervisor.supervisors.commands: []
+supervisor.memory.capacity.mb: 3072.0
+#By convention 1 cpu core should be about 100, but this can be adjusted if needed
+# using 100 makes it simple to set the desired value to the capacity measurement
+# for single threaded bolts
+supervisor.cpu.capacity: 400.0
### worker.* configs are for task workers
worker.childopts: "-Xmx768m"
@@ -174,6 +179,9 @@ storm.messaging.netty.socket.backlog: 500
# By default, the Netty SASL authentication is set to false. Users can override and set it true for a specific topology.
storm.messaging.netty.authentication: false
+# Default plugin to use for automatic network topology discovery
+storm.network.topography.plugin: backtype.storm.networktopography.DefaultRackDNSToSwitchMapping
+
# default number of seconds group mapping service will cache user group
storm.group.mapping.service.cache.duration.secs: 120
@@ -215,4 +223,10 @@ topology.environment: null
topology.bolts.outgoing.overflow.buffer.enable: false
topology.disruptor.wait.timeout.millis: 1000
+# Configs for Resource Aware Scheduler
+topology.component.resources.onheap.memory.mb: 128.0
+topology.component.resources.offheap.memory.mb: 0.0
+topology.component.cpu.pcore.percent: 10.0
+topology.worker.max.heap.size.mb: 768.0
+
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
----------------------------------------------------------------------
diff --git a/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
new file mode 100644
index 0000000..96e300f
--- /dev/null
+++ b/examples/storm-starter/src/jvm/storm/starter/ResourceAwareExampleTopology.java
@@ -0,0 +1,87 @@
+package storm.starter;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.StormSubmitter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.testing.TestWordSpout;
+import backtype.storm.topology.BoltDeclarer;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.SpoutDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import backtype.storm.tuple.Values;
+import backtype.storm.utils.Utils;
+
+import java.util.Map;
+
+/**
+ * Created by jerrypeng on 8/19/15.
+ */
+public class ResourceAwareExampleTopology {
+ public static class ExclamationBolt extends BaseRichBolt {
+ OutputCollector _collector;
+
+ @Override
+ public void prepare(Map conf, TopologyContext context, OutputCollector collector) {
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple tuple) {
+ _collector.emit(tuple, new Values(tuple.getString(0) + "!!!"));
+ _collector.ack(tuple);
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("word"));
+ }
+
+
+ }
+
+ public static void main(String[] args) throws Exception {
+ TopologyBuilder builder = new TopologyBuilder();
+
+ SpoutDeclarer spout = builder.setSpout("word", new TestWordSpout(), 10);
+ //set cpu requirement
+ spout.setCPULoad(20);
+ //set onheap and offheap memory requirement
+ spout.setMemoryLoad(64, 16);
+
+ BoltDeclarer bolt1 = builder.setBolt("exclaim1", new ExclamationBolt(), 3).shuffleGrouping("word");
+ //sets cpu requirement. Not neccessary to set both CPU and memory.
+ //For requirements not set, a default value will be used
+ bolt1.setCPULoad(15);
+
+ BoltDeclarer bolt2 = builder.setBolt("exclaim2", new ExclamationBolt(), 2).shuffleGrouping("exclaim1");
+ bolt2.setMemoryLoad(100);
+
+ Config conf = new Config();
+ conf.setDebug(true);
+
+ /**
+ * Use to limit the maximum amount of memory (in MB) allocated to one worker process.
+ * Can be used to spread executors to to multiple workers
+ */
+ conf.setTopologyWorkerMaxHeapSize(512.0);
+
+ if (args != null && args.length > 0) {
+ conf.setNumWorkers(3);
+
+ StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
+ }
+ else {
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("test", conf, builder.createTopology());
+ Utils.sleep(10000);
+ cluster.killTopology("test");
+ cluster.shutdown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/converter.clj b/storm-core/src/clj/backtype/storm/converter.clj
index c571fa1..27336f0 100644
--- a/storm-core/src/clj/backtype/storm/converter.clj
+++ b/storm-core/src/clj/backtype/storm/converter.clj
@@ -30,6 +30,7 @@
(.set_scheduler_meta (:scheduler-meta supervisor-info))
(.set_uptime_secs (long (:uptime-secs supervisor-info)))
(.set_version (:version supervisor-info))
+ (.set_resources_map (:resources-map supervisor-info))
))
(defn clojurify-supervisor-info [^SupervisorInfo supervisor-info]
@@ -42,7 +43,8 @@
(if (.get_meta supervisor-info) (into [] (.get_meta supervisor-info)))
(if (.get_scheduler_meta supervisor-info) (into {} (.get_scheduler_meta supervisor-info)))
(.get_uptime_secs supervisor-info)
- (.get_version supervisor-info))))
+ (.get_version supervisor-info)
+ (if-let [res-map (.get_resources_map supervisor-info)] (into {} res-map)))))
(defn thriftify-assignment [assignment]
(doto (Assignment.)
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/common.clj b/storm-core/src/clj/backtype/storm/daemon/common.clj
index a30850d..a1b6241 100644
--- a/storm-core/src/clj/backtype/storm/daemon/common.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/common.clj
@@ -57,7 +57,7 @@
;; component->executors is a map from spout/bolt id to number of executors for that component
(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors owner topology-action-options prev-status component->debug])
-(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version])
+(defrecord SupervisorInfo [time-secs hostname assignment-id used-ports meta scheduler-meta uptime-secs version resources-map])
(defprotocol DaemonCommon
(waiting? [this]))
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
index 70bd197..61aba6b 100644
--- a/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/nimbus.clj
@@ -333,7 +333,7 @@
supervisor-infos (all-supervisor-info storm-cluster-state nil)
supervisor-details (dofor [[id info] supervisor-infos]
- (SupervisorDetails. id (:meta info)))
+ (SupervisorDetails. id (:meta info) (:resources-map info)))
ret (.allSlotsAvailableForScheduling inimbus
supervisor-details
@@ -568,7 +568,8 @@
all-ports (-> (get all-scheduling-slots sid)
(set/difference dead-ports)
((fn [ports] (map int ports))))
- supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports)]]
+ supervisor-details (SupervisorDetails. sid hostname scheduler-meta all-ports (:resources-map supervisor-info))
+ ]]
{sid supervisor-details}))]
(merge all-supervisor-details
(into {}
@@ -652,7 +653,7 @@
(apply merge-with set/union))
supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
- cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)
+ cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment conf)
;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
@@ -701,7 +702,7 @@
(let [infos (all-supervisor-info storm-cluster-state)]
(->> infos
(map (fn [[id info]]
- [id (SupervisorDetails. id (:hostname info) (:scheduler-meta info) nil)]))
+ [id (SupervisorDetails. id (:hostname info) (:scheduler-meta info) nil (:resources-map info))]))
(into {}))))
(defn- to-worker-slot [[node port]]
@@ -1433,16 +1434,16 @@
;; TODO: need to get the port info about supervisors...
;; in standalone just look at metadata, otherwise just say N/A?
supervisor-summaries (dofor [[id info] supervisor-infos]
- (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
-
- sup-sum (SupervisorSummary. (:hostname info)
- (:uptime-secs info)
- (count ports)
- (count (:used-ports info))
- id) ]
- (when-let [version (:version info)] (.set_version sup-sum version))
- sup-sum
- ))
+ (let [ports (set (:meta info)) ;;TODO: this is only true for standalone
+ sup-sum (SupervisorSummary. (:hostname info)
+ (:uptime-secs info)
+ (count ports)
+ (count (:used-ports info))
+ id) ]
+ (.set_total_resources sup-sum (map-val double (:resources-map info)))
+ (when-let [version (:version info)] (.set_version sup-sum version))
+ sup-sum))
+ nimbus-uptime ((:uptime nimbus))
bases (topology-bases storm-cluster-state)
nimbuses (.nimbuses storm-cluster-state)
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
index c78e865..3eed36e 100644
--- a/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/backtype/storm/daemon/supervisor.clj
@@ -25,6 +25,7 @@
[java.io File])
(:use [backtype.storm config util log timer local-state])
(:import [backtype.storm.utils VersionInfo])
+ (:import [backtype.storm Config])
(:use [backtype.storm.daemon common])
(:require [backtype.storm.daemon [worker :as worker]]
[backtype.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]]
@@ -495,6 +496,11 @@
(.add processes-event-manager sync-processes)
)))
+(defn mk-supervisor-capacities
+ [conf]
+ {Config/SUPERVISOR_MEMORY_CAPACITY_MB (double (conf SUPERVISOR-MEMORY-CAPACITY-MB))
+ Config/SUPERVISOR_CPU_CAPACITY (double (conf SUPERVISOR-CPU-CAPACITY))})
+
;; in local state, supervisor stores who its current assignments are
;; another thread launches events to restart any dead processes if necessary
(defserverfn mk-supervisor [conf shared-context ^ISupervisor isupervisor]
@@ -516,7 +522,8 @@
(.getMetadata isupervisor)
(conf SUPERVISOR-SCHEDULER-META)
((:uptime supervisor))
- (:version supervisor))))]
+ (:version supervisor)
+ (mk-supervisor-capacities conf))))]
(heartbeat-fn)
;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java
index c805d03..26a6681 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -169,6 +169,15 @@ public class Config extends HashMap<String, Object> {
public static final Object STORM_CLUSTER_MODE_SCHEMA = String.class;
/**
+ * What Network Topography detection classes should we use.
+ * Given a list of supervisor hostnames (or IP addresses), this class would return a list of
+ * rack names that correspond to the supervisors. This information is stored in Cluster.java, and
+ * is used in the resource aware scheduler.
+ */
+ public static final String STORM_NETWORK_TOPOGRAPHY_PLUGIN = "storm.network.topography.plugin";
+ public static final Object STORM_NETWORK_TOPOGRAPHY_PLUGIN_SCHEMA = String.class;
+
+ /**
* The hostname the supervisors/workers should report to nimbus. If unset, Storm will
* get the hostname to report by calling <code>InetAddress.getLocalHost().getCanonicalHostName()</code>.
*
@@ -993,6 +1002,22 @@ public class Config extends HashMap<String, Object> {
public static final Object SUPERVISOR_WORKER_LAUNCHER_SCHEMA = String.class;
/**
+ * The total amount of memory (in MiB) a supervisor is allowed to give to its workers.
+ * A default value will be set for this config if user does not override
+ */
+ public static final String SUPERVISOR_MEMORY_CAPACITY_MB = "supervisor.memory.capacity.mb";
+ public static final Object SUPERVISOR_MEMORY_CAPACITY_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
+
+ /**
+ * The total amount of CPU resources a supervisor is allowed to give to its workers.
+ * By convention 1 cpu core should be about 100, but this can be adjusted if needed
+ * using 100 makes it simple to set the desired value to the capacity measurement
+ * for single threaded bolts. A default value will be set for this config if user does not override
+ */
+ public static final String SUPERVISOR_CPU_CAPACITY = "supervisor.cpu.capacity";
+ public static final Object SUPERVISOR_CPU_CAPACITY_SCHEMA = ConfigValidation.PositiveNumberValidator;
+
+ /**
* The jvm opts provided to workers launched by this supervisor. All "%ID%", "%WORKER-ID%", "%TOPOLOGY-ID%"
* and "%WORKER-PORT%" substrings are replaced with:
* %ID% -> port (for backward compatibility),
@@ -1133,6 +1158,34 @@ public class Config extends HashMap<String, Object> {
public static final Object TOPOLOGY_TASKS_SCHEMA = ConfigValidation.IntegerValidator;
/**
+ * The maximum amount of memory an instance of a spout/bolt will take on heap. This enables the scheduler
+ * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override
+ */
+ public static final String TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB = "topology.component.resources.onheap.memory.mb";
+ public static final Object TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
+
+ /**
+ * The maximum amount of memory an instance of a spout/bolt will take off heap. This enables the scheduler
+ * to allocate slots on machines with enough available memory. A default value will be set for this config if user does not override
+ */
+ public static final String TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB = "topology.component.resources.offheap.memory.mb";
+ public static final Object TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
+
+ /**
+ * The config indicates the percentage of cpu for a core an instance(executor) of a component will use.
+ * Assuming the a core value to be 100, a value of 10 indicates 10% of the core.
+ * The P in PCORE represents the term "physical". A default value will be set for this config if user does not override
+ */
+ public static final String TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT = "topology.component.cpu.pcore.percent";
+ public static final Object TOPOLOGY_COMPONENT_CPU_PCORE_PERCENT_SCHEMA = ConfigValidation.NonNegativeNumberValidator;
+
+ /**
+ * A per topology config that specifies the maximum amount of memory a worker can use for that specific topology
+ */
+ public static final String TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB = "topology.worker.max.heap.size.mb";
+ public static final Object TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB_SCHEMA = ConfigValidation.PositiveNumberValidator;
+
+ /**
* How many executors to spawn for ackers.
*
* <p>By not setting this variable or setting it as null, Storm will set the number of acker executors
@@ -1755,4 +1808,14 @@ public class Config extends HashMap<String, Object> {
conf.put(Config.TOPOLOGY_KRYO_DECORATORS, ret);
return ret;
}
+
+ /**
+ * set the max heap size allow per worker for this topology
+ * @param size
+ */
+ public void setTopologyWorkerMaxHeapSize(Number size) {
+ if(size != null) {
+ this.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, size);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/ConfigValidation.java b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
index fd9dae7..e6c0986 100644
--- a/storm-core/src/jvm/backtype/storm/ConfigValidation.java
+++ b/storm-core/src/jvm/backtype/storm/ConfigValidation.java
@@ -261,6 +261,28 @@ public class ConfigValidation {
};
/**
+ * Validates a non negative double value.
+ */
+ public static Object NonNegativeNumberValidator = new FieldValidator() {
+ @Override
+ public void validateField(String name, Object o) throws IllegalArgumentException {
+ if (o == null) {
+ // A null value is acceptable.
+ return;
+ }
+ final double double_value;
+ if (o instanceof Number)
+ {
+ double_value = ((Number)o).doubleValue();
+ if (double_value >= 0.0) {
+ return;
+ }
+ }
+ throw new IllegalArgumentException("Field " + name + " must be a non-negative double.");
+ }
+ };
+
+ /**
* Validates a Positive Number
*/
public static Object PositiveNumberValidator = new FieldValidator() {
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/StormSubmitter.java b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
index a4ccf5f..8afbbc6 100644
--- a/storm-core/src/jvm/backtype/storm/StormSubmitter.java
+++ b/storm-core/src/jvm/backtype/storm/StormSubmitter.java
@@ -24,6 +24,7 @@ import java.util.regex.Pattern;
import java.util.HashMap;
import java.util.Map;
+import backtype.storm.scheduler.resource.ResourceUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.thrift.TException;
import org.json.simple.JSONValue;
@@ -185,9 +186,10 @@ public class StormSubmitter {
* @throws AlreadyAliveException
* @throws InvalidTopologyException
* @throws AuthorizationException
+ * @throws IllegalArgumentException thrown if configs will yield an unschedulable topology. validateConfs validates confs
*/
public static void submitTopologyAs(String name, Map stormConf, StormTopology topology, SubmitOptions opts, ProgressListener progressListener, String asUser)
- throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
+ throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, IllegalArgumentException {
if(!Utils.isValidConf(stormConf)) {
throw new IllegalArgumentException("Storm conf is not valid. Must be json-serializable");
}
@@ -197,6 +199,8 @@ public class StormSubmitter {
conf.putAll(stormConf);
stormConf.putAll(prepareZookeeperAuthentication(conf));
+ validateConfs(conf, topology);
+
Map<String,String> passedCreds = new HashMap<String, String>();
if (opts != null) {
Credentials tmpCreds = opts.get_creds();
@@ -442,4 +446,33 @@ public class StormSubmitter {
*/
public void onCompleted(String srcFile, String targetFile, long totalBytes);
}
+
+ private static void validateConfs(Map stormConf, StormTopology topology) throws IllegalArgumentException {
+ double largestMemReq = getMaxExecutorMemoryUsageForTopo(topology, stormConf);
+ Double topologyWorkerMaxHeapSize = Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB));
+ if(topologyWorkerMaxHeapSize < largestMemReq) {
+ throw new IllegalArgumentException("Topology will not be able to be successfully scheduled: Config TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB="
+ +Utils.getDouble(stormConf.get(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB)) + " < "
+ + largestMemReq + " (Largest memory requirement of a component in the topology). Perhaps set TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB to a larger amount");
+ }
+ }
+
+ private static double getMaxExecutorMemoryUsageForTopo(StormTopology topology, Map topologyConf) {
+ double largestMemoryOperator = 0.0;
+ for(Map<String, Double> entry : ResourceUtils.getBoltsResources(topology, topologyConf).values()) {
+ double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+ + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ if(memoryRequirement > largestMemoryOperator) {
+ largestMemoryOperator = memoryRequirement;
+ }
+ }
+ for(Map<String, Double> entry : ResourceUtils.getSpoutsResources(topology, topologyConf).values()) {
+ double memoryRequirement = entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_OFFHEAP_MEMORY_MB)
+ + entry.get(Config.TOPOLOGY_COMPONENT_RESOURCES_ONHEAP_MEMORY_MB);
+ if(memoryRequirement > largestMemoryOperator) {
+ largestMemoryOperator = memoryRequirement;
+ }
+ }
+ return largestMemoryOperator;
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/Assignment.java b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
index 7555b54..6405c05 100644
--- a/storm-core/src/jvm/backtype/storm/generated/Assignment.java
+++ b/storm-core/src/jvm/backtype/storm/generated/Assignment.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-28")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-6")
public class Assignment implements org.apache.thrift.TBase<Assignment, Assignment._Fields>, java.io.Serializable, Cloneable, Comparable<Assignment> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("Assignment");
@@ -678,15 +678,15 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 2: // NODE_HOST
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map504 = iprot.readMapBegin();
- struct.node_host = new HashMap<String,String>(2*_map504.size);
- String _key505;
- String _val506;
- for (int _i507 = 0; _i507 < _map504.size; ++_i507)
+ org.apache.thrift.protocol.TMap _map524 = iprot.readMapBegin();
+ struct.node_host = new HashMap<String,String>(2*_map524.size);
+ String _key525;
+ String _val526;
+ for (int _i527 = 0; _i527 < _map524.size; ++_i527)
{
- _key505 = iprot.readString();
- _val506 = iprot.readString();
- struct.node_host.put(_key505, _val506);
+ _key525 = iprot.readString();
+ _val526 = iprot.readString();
+ struct.node_host.put(_key525, _val526);
}
iprot.readMapEnd();
}
@@ -698,26 +698,26 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 3: // EXECUTOR_NODE_PORT
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map508 = iprot.readMapBegin();
- struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map508.size);
- List<Long> _key509;
- NodeInfo _val510;
- for (int _i511 = 0; _i511 < _map508.size; ++_i511)
+ org.apache.thrift.protocol.TMap _map528 = iprot.readMapBegin();
+ struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map528.size);
+ List<Long> _key529;
+ NodeInfo _val530;
+ for (int _i531 = 0; _i531 < _map528.size; ++_i531)
{
{
- org.apache.thrift.protocol.TList _list512 = iprot.readListBegin();
- _key509 = new ArrayList<Long>(_list512.size);
- long _elem513;
- for (int _i514 = 0; _i514 < _list512.size; ++_i514)
+ org.apache.thrift.protocol.TList _list532 = iprot.readListBegin();
+ _key529 = new ArrayList<Long>(_list532.size);
+ long _elem533;
+ for (int _i534 = 0; _i534 < _list532.size; ++_i534)
{
- _elem513 = iprot.readI64();
- _key509.add(_elem513);
+ _elem533 = iprot.readI64();
+ _key529.add(_elem533);
}
iprot.readListEnd();
}
- _val510 = new NodeInfo();
- _val510.read(iprot);
- struct.executor_node_port.put(_key509, _val510);
+ _val530 = new NodeInfo();
+ _val530.read(iprot);
+ struct.executor_node_port.put(_key529, _val530);
}
iprot.readMapEnd();
}
@@ -729,25 +729,25 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
case 4: // EXECUTOR_START_TIME_SECS
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map515 = iprot.readMapBegin();
- struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map515.size);
- List<Long> _key516;
- long _val517;
- for (int _i518 = 0; _i518 < _map515.size; ++_i518)
+ org.apache.thrift.protocol.TMap _map535 = iprot.readMapBegin();
+ struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map535.size);
+ List<Long> _key536;
+ long _val537;
+ for (int _i538 = 0; _i538 < _map535.size; ++_i538)
{
{
- org.apache.thrift.protocol.TList _list519 = iprot.readListBegin();
- _key516 = new ArrayList<Long>(_list519.size);
- long _elem520;
- for (int _i521 = 0; _i521 < _list519.size; ++_i521)
+ org.apache.thrift.protocol.TList _list539 = iprot.readListBegin();
+ _key536 = new ArrayList<Long>(_list539.size);
+ long _elem540;
+ for (int _i541 = 0; _i541 < _list539.size; ++_i541)
{
- _elem520 = iprot.readI64();
- _key516.add(_elem520);
+ _elem540 = iprot.readI64();
+ _key536.add(_elem540);
}
iprot.readListEnd();
}
- _val517 = iprot.readI64();
- struct.executor_start_time_secs.put(_key516, _val517);
+ _val537 = iprot.readI64();
+ struct.executor_start_time_secs.put(_key536, _val537);
}
iprot.readMapEnd();
}
@@ -779,10 +779,10 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(NODE_HOST_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, struct.node_host.size()));
- for (Map.Entry<String, String> _iter522 : struct.node_host.entrySet())
+ for (Map.Entry<String, String> _iter542 : struct.node_host.entrySet())
{
- oprot.writeString(_iter522.getKey());
- oprot.writeString(_iter522.getValue());
+ oprot.writeString(_iter542.getKey());
+ oprot.writeString(_iter542.getValue());
}
oprot.writeMapEnd();
}
@@ -794,17 +794,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(EXECUTOR_NODE_PORT_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, struct.executor_node_port.size()));
- for (Map.Entry<List<Long>, NodeInfo> _iter523 : struct.executor_node_port.entrySet())
+ for (Map.Entry<List<Long>, NodeInfo> _iter543 : struct.executor_node_port.entrySet())
{
{
- oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter523.getKey().size()));
- for (long _iter524 : _iter523.getKey())
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter543.getKey().size()));
+ for (long _iter544 : _iter543.getKey())
{
- oprot.writeI64(_iter524);
+ oprot.writeI64(_iter544);
}
oprot.writeListEnd();
}
- _iter523.getValue().write(oprot);
+ _iter543.getValue().write(oprot);
}
oprot.writeMapEnd();
}
@@ -816,17 +816,17 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
oprot.writeFieldBegin(EXECUTOR_START_TIME_SECS_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, struct.executor_start_time_secs.size()));
- for (Map.Entry<List<Long>, Long> _iter525 : struct.executor_start_time_secs.entrySet())
+ for (Map.Entry<List<Long>, Long> _iter545 : struct.executor_start_time_secs.entrySet())
{
{
- oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter525.getKey().size()));
- for (long _iter526 : _iter525.getKey())
+ oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, _iter545.getKey().size()));
+ for (long _iter546 : _iter545.getKey())
{
- oprot.writeI64(_iter526);
+ oprot.writeI64(_iter546);
}
oprot.writeListEnd();
}
- oprot.writeI64(_iter525.getValue());
+ oprot.writeI64(_iter545.getValue());
}
oprot.writeMapEnd();
}
@@ -865,42 +865,42 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
if (struct.is_set_node_host()) {
{
oprot.writeI32(struct.node_host.size());
- for (Map.Entry<String, String> _iter527 : struct.node_host.entrySet())
+ for (Map.Entry<String, String> _iter547 : struct.node_host.entrySet())
{
- oprot.writeString(_iter527.getKey());
- oprot.writeString(_iter527.getValue());
+ oprot.writeString(_iter547.getKey());
+ oprot.writeString(_iter547.getValue());
}
}
}
if (struct.is_set_executor_node_port()) {
{
oprot.writeI32(struct.executor_node_port.size());
- for (Map.Entry<List<Long>, NodeInfo> _iter528 : struct.executor_node_port.entrySet())
+ for (Map.Entry<List<Long>, NodeInfo> _iter548 : struct.executor_node_port.entrySet())
{
{
- oprot.writeI32(_iter528.getKey().size());
- for (long _iter529 : _iter528.getKey())
+ oprot.writeI32(_iter548.getKey().size());
+ for (long _iter549 : _iter548.getKey())
{
- oprot.writeI64(_iter529);
+ oprot.writeI64(_iter549);
}
}
- _iter528.getValue().write(oprot);
+ _iter548.getValue().write(oprot);
}
}
}
if (struct.is_set_executor_start_time_secs()) {
{
oprot.writeI32(struct.executor_start_time_secs.size());
- for (Map.Entry<List<Long>, Long> _iter530 : struct.executor_start_time_secs.entrySet())
+ for (Map.Entry<List<Long>, Long> _iter550 : struct.executor_start_time_secs.entrySet())
{
{
- oprot.writeI32(_iter530.getKey().size());
- for (long _iter531 : _iter530.getKey())
+ oprot.writeI32(_iter550.getKey().size());
+ for (long _iter551 : _iter550.getKey())
{
- oprot.writeI64(_iter531);
+ oprot.writeI64(_iter551);
}
}
- oprot.writeI64(_iter530.getValue());
+ oprot.writeI64(_iter550.getValue());
}
}
}
@@ -914,64 +914,64 @@ public class Assignment implements org.apache.thrift.TBase<Assignment, Assignmen
BitSet incoming = iprot.readBitSet(3);
if (incoming.get(0)) {
{
- org.apache.thrift.protocol.TMap _map532 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
- struct.node_host = new HashMap<String,String>(2*_map532.size);
- String _key533;
- String _val534;
- for (int _i535 = 0; _i535 < _map532.size; ++_i535)
+ org.apache.thrift.protocol.TMap _map552 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.STRING, iprot.readI32());
+ struct.node_host = new HashMap<String,String>(2*_map552.size);
+ String _key553;
+ String _val554;
+ for (int _i555 = 0; _i555 < _map552.size; ++_i555)
{
- _key533 = iprot.readString();
- _val534 = iprot.readString();
- struct.node_host.put(_key533, _val534);
+ _key553 = iprot.readString();
+ _val554 = iprot.readString();
+ struct.node_host.put(_key553, _val554);
}
}
struct.set_node_host_isSet(true);
}
if (incoming.get(1)) {
{
- org.apache.thrift.protocol.TMap _map536 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
- struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map536.size);
- List<Long> _key537;
- NodeInfo _val538;
- for (int _i539 = 0; _i539 < _map536.size; ++_i539)
+ org.apache.thrift.protocol.TMap _map556 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.STRUCT, iprot.readI32());
+ struct.executor_node_port = new HashMap<List<Long>,NodeInfo>(2*_map556.size);
+ List<Long> _key557;
+ NodeInfo _val558;
+ for (int _i559 = 0; _i559 < _map556.size; ++_i559)
{
{
- org.apache.thrift.protocol.TList _list540 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _key537 = new ArrayList<Long>(_list540.size);
- long _elem541;
- for (int _i542 = 0; _i542 < _list540.size; ++_i542)
+ org.apache.thrift.protocol.TList _list560 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _key557 = new ArrayList<Long>(_list560.size);
+ long _elem561;
+ for (int _i562 = 0; _i562 < _list560.size; ++_i562)
{
- _elem541 = iprot.readI64();
- _key537.add(_elem541);
+ _elem561 = iprot.readI64();
+ _key557.add(_elem561);
}
}
- _val538 = new NodeInfo();
- _val538.read(iprot);
- struct.executor_node_port.put(_key537, _val538);
+ _val558 = new NodeInfo();
+ _val558.read(iprot);
+ struct.executor_node_port.put(_key557, _val558);
}
}
struct.set_executor_node_port_isSet(true);
}
if (incoming.get(2)) {
{
- org.apache.thrift.protocol.TMap _map543 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
- struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map543.size);
- List<Long> _key544;
- long _val545;
- for (int _i546 = 0; _i546 < _map543.size; ++_i546)
+ org.apache.thrift.protocol.TMap _map563 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.LIST, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ struct.executor_start_time_secs = new HashMap<List<Long>,Long>(2*_map563.size);
+ List<Long> _key564;
+ long _val565;
+ for (int _i566 = 0; _i566 < _map563.size; ++_i566)
{
{
- org.apache.thrift.protocol.TList _list547 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _key544 = new ArrayList<Long>(_list547.size);
- long _elem548;
- for (int _i549 = 0; _i549 < _list547.size; ++_i549)
+ org.apache.thrift.protocol.TList _list567 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _key564 = new ArrayList<Long>(_list567.size);
+ long _elem568;
+ for (int _i569 = 0; _i569 < _list567.size; ++_i569)
{
- _elem548 = iprot.readI64();
- _key544.add(_elem548);
+ _elem568 = iprot.readI64();
+ _key564.add(_elem568);
}
}
- _val545 = iprot.readI64();
- struct.executor_start_time_secs.put(_key544, _val545);
+ _val565 = iprot.readI64();
+ struct.executor_start_time_secs.put(_key564, _val565);
}
}
struct.set_executor_start_time_secs_isSet(true);
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
index 3234d2b..0ddcaba 100644
--- a/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/BoltAggregateStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-5-22")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-10-6")
public class BoltAggregateStats implements org.apache.thrift.TBase<BoltAggregateStats, BoltAggregateStats._Fields>, java.io.Serializable, Cloneable, Comparable<BoltAggregateStats> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BoltAggregateStats");
http://git-wip-us.apache.org/repos/asf/storm/blob/121d022b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/generated/BoltStats.java b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
index 26ef5d8..315f955 100644
--- a/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
+++ b/storm-core/src/jvm/backtype/storm/generated/BoltStats.java
@@ -51,7 +51,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@SuppressWarnings({"cast", "rawtypes", "serial", "unchecked"})
-@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-3")
+@Generated(value = "Autogenerated by Thrift Compiler (0.9.2)", date = "2015-9-30")
public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._Fields>, java.io.Serializable, Cloneable, Comparable<BoltStats> {
private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("BoltStats");
@@ -881,28 +881,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
case 1: // ACKED
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map98 = iprot.readMapBegin();
- struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map98.size);
- String _key99;
- Map<GlobalStreamId,Long> _val100;
- for (int _i101 = 0; _i101 < _map98.size; ++_i101)
+ org.apache.thrift.protocol.TMap _map108 = iprot.readMapBegin();
+ struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map108.size);
+ String _key109;
+ Map<GlobalStreamId,Long> _val110;
+ for (int _i111 = 0; _i111 < _map108.size; ++_i111)
{
- _key99 = iprot.readString();
+ _key109 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map102 = iprot.readMapBegin();
- _val100 = new HashMap<GlobalStreamId,Long>(2*_map102.size);
- GlobalStreamId _key103;
- long _val104;
- for (int _i105 = 0; _i105 < _map102.size; ++_i105)
+ org.apache.thrift.protocol.TMap _map112 = iprot.readMapBegin();
+ _val110 = new HashMap<GlobalStreamId,Long>(2*_map112.size);
+ GlobalStreamId _key113;
+ long _val114;
+ for (int _i115 = 0; _i115 < _map112.size; ++_i115)
{
- _key103 = new GlobalStreamId();
- _key103.read(iprot);
- _val104 = iprot.readI64();
- _val100.put(_key103, _val104);
+ _key113 = new GlobalStreamId();
+ _key113.read(iprot);
+ _val114 = iprot.readI64();
+ _val110.put(_key113, _val114);
}
iprot.readMapEnd();
}
- struct.acked.put(_key99, _val100);
+ struct.acked.put(_key109, _val110);
}
iprot.readMapEnd();
}
@@ -914,28 +914,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
case 2: // FAILED
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map106 = iprot.readMapBegin();
- struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map106.size);
- String _key107;
- Map<GlobalStreamId,Long> _val108;
- for (int _i109 = 0; _i109 < _map106.size; ++_i109)
+ org.apache.thrift.protocol.TMap _map116 = iprot.readMapBegin();
+ struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map116.size);
+ String _key117;
+ Map<GlobalStreamId,Long> _val118;
+ for (int _i119 = 0; _i119 < _map116.size; ++_i119)
{
- _key107 = iprot.readString();
+ _key117 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map110 = iprot.readMapBegin();
- _val108 = new HashMap<GlobalStreamId,Long>(2*_map110.size);
- GlobalStreamId _key111;
- long _val112;
- for (int _i113 = 0; _i113 < _map110.size; ++_i113)
+ org.apache.thrift.protocol.TMap _map120 = iprot.readMapBegin();
+ _val118 = new HashMap<GlobalStreamId,Long>(2*_map120.size);
+ GlobalStreamId _key121;
+ long _val122;
+ for (int _i123 = 0; _i123 < _map120.size; ++_i123)
{
- _key111 = new GlobalStreamId();
- _key111.read(iprot);
- _val112 = iprot.readI64();
- _val108.put(_key111, _val112);
+ _key121 = new GlobalStreamId();
+ _key121.read(iprot);
+ _val122 = iprot.readI64();
+ _val118.put(_key121, _val122);
}
iprot.readMapEnd();
}
- struct.failed.put(_key107, _val108);
+ struct.failed.put(_key117, _val118);
}
iprot.readMapEnd();
}
@@ -947,28 +947,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
case 3: // PROCESS_MS_AVG
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map114 = iprot.readMapBegin();
- struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map114.size);
- String _key115;
- Map<GlobalStreamId,Double> _val116;
- for (int _i117 = 0; _i117 < _map114.size; ++_i117)
+ org.apache.thrift.protocol.TMap _map124 = iprot.readMapBegin();
+ struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map124.size);
+ String _key125;
+ Map<GlobalStreamId,Double> _val126;
+ for (int _i127 = 0; _i127 < _map124.size; ++_i127)
{
- _key115 = iprot.readString();
+ _key125 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map118 = iprot.readMapBegin();
- _val116 = new HashMap<GlobalStreamId,Double>(2*_map118.size);
- GlobalStreamId _key119;
- double _val120;
- for (int _i121 = 0; _i121 < _map118.size; ++_i121)
+ org.apache.thrift.protocol.TMap _map128 = iprot.readMapBegin();
+ _val126 = new HashMap<GlobalStreamId,Double>(2*_map128.size);
+ GlobalStreamId _key129;
+ double _val130;
+ for (int _i131 = 0; _i131 < _map128.size; ++_i131)
{
- _key119 = new GlobalStreamId();
- _key119.read(iprot);
- _val120 = iprot.readDouble();
- _val116.put(_key119, _val120);
+ _key129 = new GlobalStreamId();
+ _key129.read(iprot);
+ _val130 = iprot.readDouble();
+ _val126.put(_key129, _val130);
}
iprot.readMapEnd();
}
- struct.process_ms_avg.put(_key115, _val116);
+ struct.process_ms_avg.put(_key125, _val126);
}
iprot.readMapEnd();
}
@@ -980,28 +980,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
case 4: // EXECUTED
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map122 = iprot.readMapBegin();
- struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map122.size);
- String _key123;
- Map<GlobalStreamId,Long> _val124;
- for (int _i125 = 0; _i125 < _map122.size; ++_i125)
+ org.apache.thrift.protocol.TMap _map132 = iprot.readMapBegin();
+ struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map132.size);
+ String _key133;
+ Map<GlobalStreamId,Long> _val134;
+ for (int _i135 = 0; _i135 < _map132.size; ++_i135)
{
- _key123 = iprot.readString();
+ _key133 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map126 = iprot.readMapBegin();
- _val124 = new HashMap<GlobalStreamId,Long>(2*_map126.size);
- GlobalStreamId _key127;
- long _val128;
- for (int _i129 = 0; _i129 < _map126.size; ++_i129)
+ org.apache.thrift.protocol.TMap _map136 = iprot.readMapBegin();
+ _val134 = new HashMap<GlobalStreamId,Long>(2*_map136.size);
+ GlobalStreamId _key137;
+ long _val138;
+ for (int _i139 = 0; _i139 < _map136.size; ++_i139)
{
- _key127 = new GlobalStreamId();
- _key127.read(iprot);
- _val128 = iprot.readI64();
- _val124.put(_key127, _val128);
+ _key137 = new GlobalStreamId();
+ _key137.read(iprot);
+ _val138 = iprot.readI64();
+ _val134.put(_key137, _val138);
}
iprot.readMapEnd();
}
- struct.executed.put(_key123, _val124);
+ struct.executed.put(_key133, _val134);
}
iprot.readMapEnd();
}
@@ -1013,28 +1013,28 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
case 5: // EXECUTE_MS_AVG
if (schemeField.type == org.apache.thrift.protocol.TType.MAP) {
{
- org.apache.thrift.protocol.TMap _map130 = iprot.readMapBegin();
- struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map130.size);
- String _key131;
- Map<GlobalStreamId,Double> _val132;
- for (int _i133 = 0; _i133 < _map130.size; ++_i133)
+ org.apache.thrift.protocol.TMap _map140 = iprot.readMapBegin();
+ struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map140.size);
+ String _key141;
+ Map<GlobalStreamId,Double> _val142;
+ for (int _i143 = 0; _i143 < _map140.size; ++_i143)
{
- _key131 = iprot.readString();
+ _key141 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map134 = iprot.readMapBegin();
- _val132 = new HashMap<GlobalStreamId,Double>(2*_map134.size);
- GlobalStreamId _key135;
- double _val136;
- for (int _i137 = 0; _i137 < _map134.size; ++_i137)
+ org.apache.thrift.protocol.TMap _map144 = iprot.readMapBegin();
+ _val142 = new HashMap<GlobalStreamId,Double>(2*_map144.size);
+ GlobalStreamId _key145;
+ double _val146;
+ for (int _i147 = 0; _i147 < _map144.size; ++_i147)
{
- _key135 = new GlobalStreamId();
- _key135.read(iprot);
- _val136 = iprot.readDouble();
- _val132.put(_key135, _val136);
+ _key145 = new GlobalStreamId();
+ _key145.read(iprot);
+ _val146 = iprot.readDouble();
+ _val142.put(_key145, _val146);
}
iprot.readMapEnd();
}
- struct.execute_ms_avg.put(_key131, _val132);
+ struct.execute_ms_avg.put(_key141, _val142);
}
iprot.readMapEnd();
}
@@ -1060,15 +1060,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
oprot.writeFieldBegin(ACKED_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.acked.size()));
- for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter138 : struct.acked.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter148 : struct.acked.entrySet())
{
- oprot.writeString(_iter138.getKey());
+ oprot.writeString(_iter148.getKey());
{
- oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter138.getValue().size()));
- for (Map.Entry<GlobalStreamId, Long> _iter139 : _iter138.getValue().entrySet())
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter148.getValue().size()));
+ for (Map.Entry<GlobalStreamId, Long> _iter149 : _iter148.getValue().entrySet())
{
- _iter139.getKey().write(oprot);
- oprot.writeI64(_iter139.getValue());
+ _iter149.getKey().write(oprot);
+ oprot.writeI64(_iter149.getValue());
}
oprot.writeMapEnd();
}
@@ -1081,15 +1081,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
oprot.writeFieldBegin(FAILED_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.failed.size()));
- for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter140 : struct.failed.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter150 : struct.failed.entrySet())
{
- oprot.writeString(_iter140.getKey());
+ oprot.writeString(_iter150.getKey());
{
- oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter140.getValue().size()));
- for (Map.Entry<GlobalStreamId, Long> _iter141 : _iter140.getValue().entrySet())
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter150.getValue().size()));
+ for (Map.Entry<GlobalStreamId, Long> _iter151 : _iter150.getValue().entrySet())
{
- _iter141.getKey().write(oprot);
- oprot.writeI64(_iter141.getValue());
+ _iter151.getKey().write(oprot);
+ oprot.writeI64(_iter151.getValue());
}
oprot.writeMapEnd();
}
@@ -1102,15 +1102,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
oprot.writeFieldBegin(PROCESS_MS_AVG_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.process_ms_avg.size()));
- for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter142 : struct.process_ms_avg.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter152 : struct.process_ms_avg.entrySet())
{
- oprot.writeString(_iter142.getKey());
+ oprot.writeString(_iter152.getKey());
{
- oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter142.getValue().size()));
- for (Map.Entry<GlobalStreamId, Double> _iter143 : _iter142.getValue().entrySet())
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter152.getValue().size()));
+ for (Map.Entry<GlobalStreamId, Double> _iter153 : _iter152.getValue().entrySet())
{
- _iter143.getKey().write(oprot);
- oprot.writeDouble(_iter143.getValue());
+ _iter153.getKey().write(oprot);
+ oprot.writeDouble(_iter153.getValue());
}
oprot.writeMapEnd();
}
@@ -1123,15 +1123,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
oprot.writeFieldBegin(EXECUTED_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.executed.size()));
- for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter144 : struct.executed.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter154 : struct.executed.entrySet())
{
- oprot.writeString(_iter144.getKey());
+ oprot.writeString(_iter154.getKey());
{
- oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter144.getValue().size()));
- for (Map.Entry<GlobalStreamId, Long> _iter145 : _iter144.getValue().entrySet())
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, _iter154.getValue().size()));
+ for (Map.Entry<GlobalStreamId, Long> _iter155 : _iter154.getValue().entrySet())
{
- _iter145.getKey().write(oprot);
- oprot.writeI64(_iter145.getValue());
+ _iter155.getKey().write(oprot);
+ oprot.writeI64(_iter155.getValue());
}
oprot.writeMapEnd();
}
@@ -1144,15 +1144,15 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
oprot.writeFieldBegin(EXECUTE_MS_AVG_FIELD_DESC);
{
oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, struct.execute_ms_avg.size()));
- for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter146 : struct.execute_ms_avg.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter156 : struct.execute_ms_avg.entrySet())
{
- oprot.writeString(_iter146.getKey());
+ oprot.writeString(_iter156.getKey());
{
- oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter146.getValue().size()));
- for (Map.Entry<GlobalStreamId, Double> _iter147 : _iter146.getValue().entrySet())
+ oprot.writeMapBegin(new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, _iter156.getValue().size()));
+ for (Map.Entry<GlobalStreamId, Double> _iter157 : _iter156.getValue().entrySet())
{
- _iter147.getKey().write(oprot);
- oprot.writeDouble(_iter147.getValue());
+ _iter157.getKey().write(oprot);
+ oprot.writeDouble(_iter157.getValue());
}
oprot.writeMapEnd();
}
@@ -1180,75 +1180,75 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
TTupleProtocol oprot = (TTupleProtocol) prot;
{
oprot.writeI32(struct.acked.size());
- for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter148 : struct.acked.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter158 : struct.acked.entrySet())
{
- oprot.writeString(_iter148.getKey());
+ oprot.writeString(_iter158.getKey());
{
- oprot.writeI32(_iter148.getValue().size());
- for (Map.Entry<GlobalStreamId, Long> _iter149 : _iter148.getValue().entrySet())
+ oprot.writeI32(_iter158.getValue().size());
+ for (Map.Entry<GlobalStreamId, Long> _iter159 : _iter158.getValue().entrySet())
{
- _iter149.getKey().write(oprot);
- oprot.writeI64(_iter149.getValue());
+ _iter159.getKey().write(oprot);
+ oprot.writeI64(_iter159.getValue());
}
}
}
}
{
oprot.writeI32(struct.failed.size());
- for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter150 : struct.failed.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter160 : struct.failed.entrySet())
{
- oprot.writeString(_iter150.getKey());
+ oprot.writeString(_iter160.getKey());
{
- oprot.writeI32(_iter150.getValue().size());
- for (Map.Entry<GlobalStreamId, Long> _iter151 : _iter150.getValue().entrySet())
+ oprot.writeI32(_iter160.getValue().size());
+ for (Map.Entry<GlobalStreamId, Long> _iter161 : _iter160.getValue().entrySet())
{
- _iter151.getKey().write(oprot);
- oprot.writeI64(_iter151.getValue());
+ _iter161.getKey().write(oprot);
+ oprot.writeI64(_iter161.getValue());
}
}
}
}
{
oprot.writeI32(struct.process_ms_avg.size());
- for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter152 : struct.process_ms_avg.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter162 : struct.process_ms_avg.entrySet())
{
- oprot.writeString(_iter152.getKey());
+ oprot.writeString(_iter162.getKey());
{
- oprot.writeI32(_iter152.getValue().size());
- for (Map.Entry<GlobalStreamId, Double> _iter153 : _iter152.getValue().entrySet())
+ oprot.writeI32(_iter162.getValue().size());
+ for (Map.Entry<GlobalStreamId, Double> _iter163 : _iter162.getValue().entrySet())
{
- _iter153.getKey().write(oprot);
- oprot.writeDouble(_iter153.getValue());
+ _iter163.getKey().write(oprot);
+ oprot.writeDouble(_iter163.getValue());
}
}
}
}
{
oprot.writeI32(struct.executed.size());
- for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter154 : struct.executed.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Long>> _iter164 : struct.executed.entrySet())
{
- oprot.writeString(_iter154.getKey());
+ oprot.writeString(_iter164.getKey());
{
- oprot.writeI32(_iter154.getValue().size());
- for (Map.Entry<GlobalStreamId, Long> _iter155 : _iter154.getValue().entrySet())
+ oprot.writeI32(_iter164.getValue().size());
+ for (Map.Entry<GlobalStreamId, Long> _iter165 : _iter164.getValue().entrySet())
{
- _iter155.getKey().write(oprot);
- oprot.writeI64(_iter155.getValue());
+ _iter165.getKey().write(oprot);
+ oprot.writeI64(_iter165.getValue());
}
}
}
}
{
oprot.writeI32(struct.execute_ms_avg.size());
- for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter156 : struct.execute_ms_avg.entrySet())
+ for (Map.Entry<String, Map<GlobalStreamId,Double>> _iter166 : struct.execute_ms_avg.entrySet())
{
- oprot.writeString(_iter156.getKey());
+ oprot.writeString(_iter166.getKey());
{
- oprot.writeI32(_iter156.getValue().size());
- for (Map.Entry<GlobalStreamId, Double> _iter157 : _iter156.getValue().entrySet())
+ oprot.writeI32(_iter166.getValue().size());
+ for (Map.Entry<GlobalStreamId, Double> _iter167 : _iter166.getValue().entrySet())
{
- _iter157.getKey().write(oprot);
- oprot.writeDouble(_iter157.getValue());
+ _iter167.getKey().write(oprot);
+ oprot.writeDouble(_iter167.getValue());
}
}
}
@@ -1259,127 +1259,127 @@ public class BoltStats implements org.apache.thrift.TBase<BoltStats, BoltStats._
public void read(org.apache.thrift.protocol.TProtocol prot, BoltStats struct) throws org.apache.thrift.TException {
TTupleProtocol iprot = (TTupleProtocol) prot;
{
- org.apache.thrift.protocol.TMap _map158 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
- struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map158.size);
- String _key159;
- Map<GlobalStreamId,Long> _val160;
- for (int _i161 = 0; _i161 < _map158.size; ++_i161)
+ org.apache.thrift.protocol.TMap _map168 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+ struct.acked = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map168.size);
+ String _key169;
+ Map<GlobalStreamId,Long> _val170;
+ for (int _i171 = 0; _i171 < _map168.size; ++_i171)
{
- _key159 = iprot.readString();
+ _key169 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map162 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _val160 = new HashMap<GlobalStreamId,Long>(2*_map162.size);
- GlobalStreamId _key163;
- long _val164;
- for (int _i165 = 0; _i165 < _map162.size; ++_i165)
+ org.apache.thrift.protocol.TMap _map172 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _val170 = new HashMap<GlobalStreamId,Long>(2*_map172.size);
+ GlobalStreamId _key173;
+ long _val174;
+ for (int _i175 = 0; _i175 < _map172.size; ++_i175)
{
- _key163 = new GlobalStreamId();
- _key163.read(iprot);
- _val164 = iprot.readI64();
- _val160.put(_key163, _val164);
+ _key173 = new GlobalStreamId();
+ _key173.read(iprot);
+ _val174 = iprot.readI64();
+ _val170.put(_key173, _val174);
}
}
- struct.acked.put(_key159, _val160);
+ struct.acked.put(_key169, _val170);
}
}
struct.set_acked_isSet(true);
{
- org.apache.thrift.protocol.TMap _map166 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
- struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map166.size);
- String _key167;
- Map<GlobalStreamId,Long> _val168;
- for (int _i169 = 0; _i169 < _map166.size; ++_i169)
+ org.apache.thrift.protocol.TMap _map176 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+ struct.failed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map176.size);
+ String _key177;
+ Map<GlobalStreamId,Long> _val178;
+ for (int _i179 = 0; _i179 < _map176.size; ++_i179)
{
- _key167 = iprot.readString();
+ _key177 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map170 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _val168 = new HashMap<GlobalStreamId,Long>(2*_map170.size);
- GlobalStreamId _key171;
- long _val172;
- for (int _i173 = 0; _i173 < _map170.size; ++_i173)
+ org.apache.thrift.protocol.TMap _map180 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _val178 = new HashMap<GlobalStreamId,Long>(2*_map180.size);
+ GlobalStreamId _key181;
+ long _val182;
+ for (int _i183 = 0; _i183 < _map180.size; ++_i183)
{
- _key171 = new GlobalStreamId();
- _key171.read(iprot);
- _val172 = iprot.readI64();
- _val168.put(_key171, _val172);
+ _key181 = new GlobalStreamId();
+ _key181.read(iprot);
+ _val182 = iprot.readI64();
+ _val178.put(_key181, _val182);
}
}
- struct.failed.put(_key167, _val168);
+ struct.failed.put(_key177, _val178);
}
}
struct.set_failed_isSet(true);
{
- org.apache.thrift.protocol.TMap _map174 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
- struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map174.size);
- String _key175;
- Map<GlobalStreamId,Double> _val176;
- for (int _i177 = 0; _i177 < _map174.size; ++_i177)
+ org.apache.thrift.protocol.TMap _map184 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+ struct.process_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map184.size);
+ String _key185;
+ Map<GlobalStreamId,Double> _val186;
+ for (int _i187 = 0; _i187 < _map184.size; ++_i187)
{
- _key175 = iprot.readString();
+ _key185 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map178 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
- _val176 = new HashMap<GlobalStreamId,Double>(2*_map178.size);
- GlobalStreamId _key179;
- double _val180;
- for (int _i181 = 0; _i181 < _map178.size; ++_i181)
+ org.apache.thrift.protocol.TMap _map188 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
+ _val186 = new HashMap<GlobalStreamId,Double>(2*_map188.size);
+ GlobalStreamId _key189;
+ double _val190;
+ for (int _i191 = 0; _i191 < _map188.size; ++_i191)
{
- _key179 = new GlobalStreamId();
- _key179.read(iprot);
- _val180 = iprot.readDouble();
- _val176.put(_key179, _val180);
+ _key189 = new GlobalStreamId();
+ _key189.read(iprot);
+ _val190 = iprot.readDouble();
+ _val186.put(_key189, _val190);
}
}
- struct.process_ms_avg.put(_key175, _val176);
+ struct.process_ms_avg.put(_key185, _val186);
}
}
struct.set_process_ms_avg_isSet(true);
{
- org.apache.thrift.protocol.TMap _map182 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
- struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map182.size);
- String _key183;
- Map<GlobalStreamId,Long> _val184;
- for (int _i185 = 0; _i185 < _map182.size; ++_i185)
+ org.apache.thrift.protocol.TMap _map192 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+ struct.executed = new HashMap<String,Map<GlobalStreamId,Long>>(2*_map192.size);
+ String _key193;
+ Map<GlobalStreamId,Long> _val194;
+ for (int _i195 = 0; _i195 < _map192.size; ++_i195)
{
- _key183 = iprot.readString();
+ _key193 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map186 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
- _val184 = new HashMap<GlobalStreamId,Long>(2*_map186.size);
- GlobalStreamId _key187;
- long _val188;
- for (int _i189 = 0; _i189 < _map186.size; ++_i189)
+ org.apache.thrift.protocol.TMap _map196 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.I64, iprot.readI32());
+ _val194 = new HashMap<GlobalStreamId,Long>(2*_map196.size);
+ GlobalStreamId _key197;
+ long _val198;
+ for (int _i199 = 0; _i199 < _map196.size; ++_i199)
{
- _key187 = new GlobalStreamId();
- _key187.read(iprot);
- _val188 = iprot.readI64();
- _val184.put(_key187, _val188);
+ _key197 = new GlobalStreamId();
+ _key197.read(iprot);
+ _val198 = iprot.readI64();
+ _val194.put(_key197, _val198);
}
}
- struct.executed.put(_key183, _val184);
+ struct.executed.put(_key193, _val194);
}
}
struct.set_executed_isSet(true);
{
- org.apache.thrift.protocol.TMap _map190 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
- struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map190.size);
- String _key191;
- Map<GlobalStreamId,Double> _val192;
- for (int _i193 = 0; _i193 < _map190.size; ++_i193)
+ org.apache.thrift.protocol.TMap _map200 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRING, org.apache.thrift.protocol.TType.MAP, iprot.readI32());
+ struct.execute_ms_avg = new HashMap<String,Map<GlobalStreamId,Double>>(2*_map200.size);
+ String _key201;
+ Map<GlobalStreamId,Double> _val202;
+ for (int _i203 = 0; _i203 < _map200.size; ++_i203)
{
- _key191 = iprot.readString();
+ _key201 = iprot.readString();
{
- org.apache.thrift.protocol.TMap _map194 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
- _val192 = new HashMap<GlobalStreamId,Double>(2*_map194.size);
- GlobalStreamId _key195;
- double _val196;
- for (int _i197 = 0; _i197 < _map194.size; ++_i197)
+ org.apache.thrift.protocol.TMap _map204 = new org.apache.thrift.protocol.TMap(org.apache.thrift.protocol.TType.STRUCT, org.apache.thrift.protocol.TType.DOUBLE, iprot.readI32());
+ _val202 = new HashMap<GlobalStreamId,Double>(2*_map204.size);
+ GlobalStreamId _key205;
+ double _val206;
+ for (int _i207 = 0; _i207 < _map204.size; ++_i207)
{
- _key195 = new GlobalStreamId();
- _key195.read(iprot);
- _val196 = iprot.readDouble();
- _val192.put(_key195, _val196);
+ _key205 = new GlobalStreamId();
+ _key205.read(iprot);
+ _val206 = iprot.readDouble();
+ _val202.put(_key205, _val206);
}
}
- struct.execute_ms_avg.put(_key191, _val192);
+ struct.execute_ms_avg.put(_key201, _val202);
}
}
struct.set_execute_ms_avg_isSet(true);