You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/15 18:44:35 UTC

[02/30] storm git commit: port backtype.storm.stats to java

port backtype.storm.stats to java


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afd2d525
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afd2d525
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afd2d525

Branch: refs/heads/master
Commit: afd2d525be396c6f430e6a4a13cd1f237496a473
Parents: 11232b5
Author: 卫乐 <we...@taobao.com>
Authored: Wed Feb 24 21:06:25 2016 +0800
Committer: 卫乐 <we...@taobao.com>
Committed: Wed Feb 24 21:06:25 2016 +0800

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/converter.clj      |   25 +-
 .../org/apache/storm/daemon/builtin_metrics.clj |   33 +-
 .../clj/org/apache/storm/daemon/executor.clj    |   23 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  |   18 +-
 .../src/clj/org/apache/storm/daemon/task.clj    |   11 +-
 storm-core/src/clj/org/apache/storm/stats.clj   | 1567 ------------------
 storm-core/src/clj/org/apache/storm/ui/core.clj |   57 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |    8 +-
 8 files changed, 84 insertions(+), 1658 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index 5599d28..6e9eeb8 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -17,8 +17,9 @@
   (:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
             StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
             TopologyActionOptions DebugOptions ProfileRequest]
-           [org.apache.storm.utils Utils])
-  (:use [org.apache.storm util stats log])
+           [org.apache.storm.utils Utils]
+           [org.apache.storm.stats StatsUtil])
+  (:use [org.apache.storm util log])
   (:require [org.apache.storm.daemon [common :as common]]))
 
 (defn thriftify-supervisor-info [supervisor-info]
@@ -213,26 +214,10 @@
       (convert-to-symbol-from-status (.get_prev_status storm-base))
       (map-val clojurify-debugoptions (.get_component_debug storm-base)))))
 
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-stats [stats]
-  (if stats
-    (map-val thriftify-executor-stats
-      (map-key #(ExecutorInfo. (int (first %1)) (int (last %1)))
-        stats))
-    {}))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn clojurify-stats [stats]
-  (if stats
-    (map-val clojurify-executor-stats
-      (map-key (fn [x] (list (.get_task_start x) (.get_task_end x)))
-        stats))
-    {}))
-
 (defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb]
   (if worker-hb
     {:storm-id (.get_storm_id worker-hb)
-     :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
+     :executor-stats (clojurify-structure (StatsUtil/clojurifyStats (into {} (.get_executor_stats worker-hb))))
      :uptime (.get_uptime_secs worker-hb)
      :time-secs (.get_time_secs worker-hb)
      }
@@ -243,7 +228,7 @@
     (doto (ClusterWorkerHeartbeat.)
       (.set_uptime_secs (:uptime worker-hb))
       (.set_storm_id (:storm-id worker-hb))
-      (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
+      (.set_executor_stats (StatsUtil/thriftifyStats (filter second (:executor-stats worker-hb))))
       (.set_time_secs (:time-secs worker-hb)))))
 
 (defn clojurify-error [^ErrorInfo error]

http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
index 14d0132..caa3b71 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
@@ -16,8 +16,7 @@
 (ns org.apache.storm.daemon.builtin-metrics
   (:import [org.apache.storm.metric.api CountMetric StateMetric IMetric IStatefulObject])
   (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
-  (:import [org.apache.storm Config])
-  (:use [org.apache.storm.stats]))
+  (:import [org.apache.storm Config]))
 
 (defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count
                                 ^MultiLatencyStatAndMetric complete-latency
@@ -38,18 +37,18 @@
 
 (defn make-data [executor-type stats]
   (condp = executor-type
-    :spout (BuiltinSpoutMetrics. (stats-acked stats)
-                                 (stats-complete-latencies stats)
-                                 (stats-failed stats)
-                                 (stats-emitted stats)
-                                 (stats-transferred stats))
-    :bolt (BuiltinBoltMetrics. (stats-acked stats)
-                               (stats-process-latencies stats)
-                               (stats-failed stats)
-                               (stats-executed stats)
-                               (stats-execute-latencies stats)
-                               (stats-emitted stats)
-                               (stats-transferred stats))))
+    :spout (BuiltinSpoutMetrics. (.getAcked stats)
+                                 (.getCompleteLatencies stats)
+                                 (.getFailed stats)
+                                 (.getEmitted stats)
+                                 (.getTransferred stats))
+    :bolt (BuiltinBoltMetrics. (.getAcked stats)
+                               (.getProcessLatencies stats)
+                               (.getFailed stats)
+                               (.getExecuted stats)
+                               (.getExecuteLatencies stats)
+                               (.getEmitted stats)
+                               (.getTransferred stats))))
 
 (defn make-spout-throttling-data []
   (SpoutThrottlingMetrics. (CountMetric.)
@@ -89,10 +88,10 @@
                      (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
 
 (defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
-  (-> m .skipped-max-spout (.incrBy (stats-rate stats))))
+  (-> m .skipped-max-spout (.incrBy (.getRate stats))))
 
 (defn skipped-throttle! [^SpoutThrottlingMetrics m stats]
-  (-> m .skipped-throttle (.incrBy (stats-rate stats))))
+  (-> m .skipped-throttle (.incrBy (.getRate stats))))
 
 (defn skipped-inactive! [^SpoutThrottlingMetrics m stats]
-  (-> m .skipped-inactive (.incrBy (stats-rate stats))))
+  (-> m .skipped-inactive (.incrBy (.getRate stats))))

http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 92cc003..bca03df 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -16,8 +16,9 @@
 (ns org.apache.storm.daemon.executor
   (:use [org.apache.storm.daemon common])
   (:import [org.apache.storm.generated Grouping Grouping$_Fields]
-           [java.io Serializable])
-  (:use [org.apache.storm util config log stats])
+           [java.io Serializable]
+           [org.apache.storm.stats StatsUtil])
+  (:use [org.apache.storm util config log])
   (:import [java.util List Random HashMap ArrayList LinkedList Map])
   (:import [org.apache.storm ICredentialsListener Thrift])
   (:import [org.apache.storm.hooks ITaskHook])
@@ -41,7 +42,7 @@
            [org.json.simple JSONValue]
            [com.lmax.disruptor.dsl ProducerType]
            [org.apache.storm StormTimer])
-  (:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
+  (:require [org.apache.storm [cluster :as cluster]])
   (:require [org.apache.storm.daemon [task :as task]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
   (:require [clojure.set :as set]))
@@ -407,7 +408,7 @@
     (reify
       RunningExecutor
       (render-stats [this]
-        (stats/render-stats! (:stats executor-data)))
+        (clojurify-structure (StatsUtil/renderStats (:stats executor-data))))
       (get-executor-id [this]
         executor-id)
       (credentials-changed [this creds]
@@ -447,7 +448,7 @@
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+      (StatsUtil/spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
   (let [storm-conf (:storm-conf executor-data)
@@ -458,7 +459,7 @@
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+      (StatsUtil/spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
   (let [task-ids (:task-ids executor-data)
@@ -739,7 +740,7 @@
 
                                   (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
                                   (when delta
-                                    (stats/bolt-execute-tuple! executor-stats
+                                    (StatsUtil/boltExecuteTuple executor-stats
                                                                (.getSourceComponent tuple)
                                                                (.getSourceStreamId tuple)
                                                                delta)))))))
@@ -812,7 +813,7 @@
                                                 (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
                                               (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                                               (when delta
-                                                (stats/bolt-acked-tuple! executor-stats
+                                                (StatsUtil/boltAckedTuple executor-stats
                                                                          (.getSourceComponent tuple)
                                                                          (.getSourceStreamId tuple)
                                                                          delta))))
@@ -827,7 +828,7 @@
                                                 (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
                                               (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                                               (when delta
-                                                (stats/bolt-failed-tuple! executor-stats
+                                                (StatsUtil/boltFailedTuple executor-stats
                                                                           (.getSourceComponent tuple)
                                                                           (.getSourceStreamId tuple)
                                                                           delta))))
@@ -862,7 +863,7 @@
 
 ;; TODO: refactor this to be part of an executor-specific map
 (defmethod mk-executor-stats :spout [_ rate]
-  (stats/mk-spout-stats rate))
+  (StatsUtil/mkSpoutStats rate))
 
 (defmethod mk-executor-stats :bolt [_ rate]
-  (stats/mk-bolt-stats rate))
+  (StatsUtil/mkBoltStats rate))

http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 28a6fb8..992a864 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -14,7 +14,8 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.daemon.nimbus
-  (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
+  (:import [org.apache.thrift.server THsHaServer THsHaServer$Args]
+           [org.apache.storm.stats StatsUtil])
   (:import [org.apache.storm.generated KeyNotFoundException])
   (:import [org.apache.storm.blobstore LocalFsBlobStore])
   (:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
@@ -52,8 +53,7 @@
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
   (:use [org.apache.storm util config log zookeeper])
   (:require [org.apache.storm [cluster :as cluster]
-                            [converter :as converter]
-                            [stats :as stats]])
+                            [converter :as converter]])
   (:require [clojure.set :as set])
   (:import [org.apache.storm.daemon.common StormBase Assignment])
   (:import [org.apache.storm.zookeeper Zookeeper])
@@ -1668,7 +1668,7 @@
               executor->host+port (map-val (fn [[node port]]
                                              [(node->host node) port])
                                     executor->node+port)
-              nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
+              nodeinfos (clojurify-structure (StatsUtil/extractNodeInfosFromHbForComp executor->host+port task->component false component_id))
               all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
               latest-profile-actions (remove nil? (map (fn [nodeInfo]
                                                          (->> all-pending-actions-for-topology
@@ -1912,7 +1912,7 @@
                                               heartbeat (get beats executor)
                                               stats (:stats heartbeat)
                                               stats (if stats
-                                                      (stats/thriftify-executor-stats stats))]
+                                                      (StatsUtil/thriftifyExecutorStats stats))]
                                           (doto
                                               (ExecutorSummary. (thriftify-executor-id executor)
                                                                 (-> executor first task->component)
@@ -2106,14 +2106,14 @@
               last-err-fn (partial get-last-error
                                    (:storm-cluster-state info)
                                    topo-id)
-              topo-page-info (stats/agg-topo-execs-stats topo-id
+              ;;TODO: add last-error-fn to aggTopoExecsStats method
+              topo-page-info (StatsUtil/aggTopoExecsStats topo-id
                                                          exec->node+port
                                                          (:task->component info)
                                                          (:beats info)
                                                          (:topology info)
                                                          window
-                                                         include-sys?
-                                                         last-err-fn)]
+                                                         include-sys?)]
           (when-let [owner (:owner (:base info))]
             (.set_owner topo-page-info owner))
           (when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
@@ -2154,7 +2154,7 @@
               executor->host+port (map-val (fn [[node port]]
                                              [(node->host node) port])
                                            executor->node+port)
-              comp-page-info (stats/agg-comp-execs-stats executor->host+port
+              comp-page-info (StatsUtil/aggCompExecsStats executor->host+port
                                                          (:task->component info)
                                                          (:beats info)
                                                          window

http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 77abdec..c9f6828 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -26,10 +26,9 @@
   (:import [org.apache.storm.utils Utils ConfigUtils])
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
+  (:import [org.apache.storm.stats StatsUtil])
   (:import [java.util Collection List ArrayList])
   (:import [org.apache.storm Thrift])
-  (:require [org.apache.storm
-             [stats :as stats]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
 
 (defn mk-topology-context-builder [worker executor-data topology]
@@ -141,9 +140,9 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
-              (stats/emitted-tuple! executor-stats stream)
+              (StatsUtil/emittedTuple executor-stats stream)
               (if out-task-id
-                (stats/transferred-tuples! executor-stats stream 1)))
+                (StatsUtil/transferredTuples executor-stats stream, 1)))
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
@@ -163,8 +162,8 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
-               (stats/emitted-tuple! executor-stats stream)
-               (stats/transferred-tuples! executor-stats stream (count out-tasks)))
+               (StatsUtil/emittedTuple executor-stats stream)
+               (StatsUtil/transferredTuples executor-stats stream (count out-tasks)))
              out-tasks)))
     ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj
deleted file mode 100644
index 8b37fc3..0000000
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ /dev/null
@@ -1,1567 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.stats
-  (:import [org.apache.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent
-            NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
-            ClusterSummary TopologyInfo TopologySummary ExecutorInfo ExecutorSummary ExecutorStats
-            ExecutorSpecificStats SpoutStats BoltStats ErrorInfo
-            SupervisorSummary CommonAggregateStats ComponentAggregateStats
-            ComponentPageInfo ComponentType BoltAggregateStats
-            ExecutorAggregateStats SpecificAggregateStats
-            SpoutAggregateStats TopologyPageInfo TopologyStats])
-  (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]
-           [java.util Collection])
-  (:use [org.apache.storm log util])
-  (:use [clojure.math.numeric-tower :only [ceil]]))
-
-(def TEN-MIN-IN-SECONDS (* 10 60))
-
-(def COMMON-FIELDS [:emitted :transferred])
-(defrecord CommonStats [^MultiCountStatAndMetric emitted
-                        ^MultiCountStatAndMetric transferred
-                        rate])
-
-(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
-;;acked and failed count individual tuples
-(defrecord BoltExecutorStats [^CommonStats common
-                              ^MultiCountStatAndMetric acked
-                              ^MultiCountStatAndMetric failed
-                              ^MultiLatencyStatAndMetric process-latencies
-                              ^MultiCountStatAndMetric executed
-                              ^MultiLatencyStatAndMetric execute-latencies])
-
-(def SPOUT-FIELDS [:acked :failed :complete-latencies])
-;;acked and failed count tuple completion
-(defrecord SpoutExecutorStats [^CommonStats common
-                               ^MultiCountStatAndMetric acked
-                               ^MultiCountStatAndMetric failed
-                               ^MultiLatencyStatAndMetric complete-latencies])
-
-(def NUM-STAT-BUCKETS 20)
-
-(defn- div
-  "Perform floating point division on the arguments."
-  [f & rest]
-  (apply / (double f) rest))
-
-(defn- mk-common-stats
-  [rate]
-  (CommonStats.
-    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
-    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
-    rate))
-
-(defn mk-bolt-stats
-  [rate]
-  (BoltExecutorStats.
-    (mk-common-stats rate)
-    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
-    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
-    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)
-    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
-    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
-
-(defn mk-spout-stats
-  [rate]
-  (SpoutExecutorStats.
-    (mk-common-stats rate)
-    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
-    (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
-    (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
-
-(defmacro stats-rate
-  [stats]
-  `(-> ~stats :common :rate))
-
-(defmacro stats-emitted
-  [stats]
-  `(-> ~stats :common :emitted))
-
-(defmacro stats-transferred
-  [stats]
-  `(-> ~stats :common :transferred))
-
-(defmacro stats-executed
-  [stats]
-  `(:executed ~stats))
-
-(defmacro stats-acked
-  [stats]
-  `(:acked ~stats))
-
-(defmacro stats-failed
-  [stats]
-  `(:failed ~stats))
-
-(defmacro stats-execute-latencies
-  [stats]
-  `(:execute-latencies ~stats))
-
-(defmacro stats-process-latencies
-  [stats]
-  `(:process-latencies ~stats))
-
-(defmacro stats-complete-latencies
-  [stats]
-  `(:complete-latencies ~stats))
-
-(defn emitted-tuple!
-  [stats stream]
-  (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long (stats-rate stats)))
-
-(defn transferred-tuples!
-  [stats stream amt]
-  (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream ^long (* (stats-rate stats) amt)))
-
-(defn bolt-execute-tuple!
-  [^BoltExecutorStats stats component stream latency-ms]
-  (let [key [component stream]
-        ^MultiCountStatAndMetric executed (stats-executed stats)
-        ^MultiLatencyStatAndMetric exec-lat (stats-execute-latencies stats)]
-    (.incBy executed key (stats-rate stats))
-    (.record exec-lat key latency-ms)))
-
-(defn bolt-acked-tuple!
-  [^BoltExecutorStats stats component stream latency-ms]
-  (let [key [component stream]
-        ^MultiCountStatAndMetric acked (stats-acked stats)
-        ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
-    (.incBy acked key (stats-rate stats))
-    (.record process-lat key latency-ms)))
-
-(defn bolt-failed-tuple!
-  [^BoltExecutorStats stats component stream latency-ms]
-  (let [key [component stream]
-        ^MultiCountStatAndMetric failed (stats-failed stats)]
-    (.incBy failed key (stats-rate stats))))
-
-(defn spout-acked-tuple!
-  [^SpoutExecutorStats stats stream latency-ms]
-  (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate stats))
-  (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream latency-ms))
-
-(defn spout-failed-tuple!
-  [^SpoutExecutorStats stats stream latency-ms]
-  (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats)))
-
-(defn- cleanup-stat! [stat]
-  (.close stat))
-
-(defn- cleanup-common-stats!
-  [^CommonStats stats]
-  (doseq [f COMMON-FIELDS]
-    (cleanup-stat! (f stats))))
-
-(defn cleanup-bolt-stats!
-  [^BoltExecutorStats stats]
-  (cleanup-common-stats! (:common stats))
-  (doseq [f BOLT-FIELDS]
-    (cleanup-stat! (f stats))))
-
-(defn cleanup-spout-stats!
-  [^SpoutExecutorStats stats]
-  (cleanup-common-stats! (:common stats))
-  (doseq [f SPOUT-FIELDS]
-    (cleanup-stat! (f stats))))
-
-(defn- value-stats
-  [stats fields]
-  (into {} (dofor [f fields]
-                  [f (if (instance? MultiCountStatAndMetric (f stats))
-                         (.getTimeCounts ^MultiCountStatAndMetric (f stats))
-                         (.getTimeLatAvg ^MultiLatencyStatAndMetric (f stats)))])))
-
-(defn- value-common-stats
-  [^CommonStats stats]
-  (merge
-    (value-stats stats COMMON-FIELDS)
-    {:rate (:rate stats)}))
-
-(defn value-bolt-stats!
-  [^BoltExecutorStats stats]
-  (cleanup-bolt-stats! stats)
-  (merge (value-common-stats (:common stats))
-         (value-stats stats BOLT-FIELDS)
-         {:type :bolt}))
-
-(defn value-spout-stats!
-  [^SpoutExecutorStats stats]
-  (cleanup-spout-stats! stats)
-  (merge (value-common-stats (:common stats))
-         (value-stats stats SPOUT-FIELDS)
-         {:type :spout}))
-
-(defn- class-selector
-  [obj & args]
-  (class obj))
-
-(defmulti render-stats! class-selector)
-
-(defmethod render-stats! SpoutExecutorStats
-  [stats]
-  (value-spout-stats! stats))
-
-(defmethod render-stats! BoltExecutorStats
-  [stats]
-  (value-bolt-stats! stats))
-
-(defmulti thriftify-specific-stats :type)
-(defmulti clojurify-specific-stats class-selector)
-
-(defn window-set-converter
-  ([stats key-fn first-key-fun]
-    (into {}
-      (for [[k v] stats]
-        ;apply the first-key-fun only to first key.
-        [(first-key-fun k)
-         (into {} (for [[k2 v2] v]
-                    [(key-fn k2) v2]))])))
-  ([stats first-key-fun]
-    (window-set-converter stats identity first-key-fun)))
-
-(defn to-global-stream-id
-  [[component stream]]
-  (GlobalStreamId. component stream))
-
-(defn from-global-stream-id [global-stream-id]
-  [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])
-
-(defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
-  [(window-set-converter (.get_acked stats) from-global-stream-id identity)
-   (window-set-converter (.get_failed stats) from-global-stream-id identity)
-   (window-set-converter (.get_process_ms_avg stats) from-global-stream-id identity)
-   (window-set-converter (.get_executed stats) from-global-stream-id identity)
-   (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id identity)])
-
-(defmethod clojurify-specific-stats SpoutStats [^SpoutStats stats]
-  [(.get_acked stats)
-   (.get_failed stats)
-   (.get_complete_ms_avg stats)])
-
-
-(defn clojurify-executor-stats
-  [^ExecutorStats stats]
-  (let [ specific-stats (.get_specific stats)
-         is_bolt? (.is_set_bolt specific-stats)
-         specific-stats (if is_bolt? (.get_bolt specific-stats) (.get_spout specific-stats))
-         specific-stats (clojurify-specific-stats specific-stats)
-         common-stats (CommonStats. (.get_emitted stats)
-                                    (.get_transferred stats)
-                                    (.get_rate stats))]
-    (if is_bolt?
-      ; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats!
-      ; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting values from all atoms and merging all values inside :common to top
-      ;level map we are pretty much doing the same here.
-      (dissoc (merge common-stats {:type :bolt}  (apply ->BoltExecutorStats (into [nil] specific-stats))) :common)
-      (dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common)
-      )))
-
-(defmethod thriftify-specific-stats :bolt
-  [stats]
-  (ExecutorSpecificStats/bolt
-    (BoltStats.
-      (window-set-converter (:acked stats) to-global-stream-id str)
-      (window-set-converter (:failed stats) to-global-stream-id str)
-      (window-set-converter (:process-latencies stats) to-global-stream-id str)
-      (window-set-converter (:executed stats) to-global-stream-id str)
-      (window-set-converter (:execute-latencies stats) to-global-stream-id str))))
-
-(defmethod thriftify-specific-stats :spout
-  [stats]
-  (ExecutorSpecificStats/spout
-    (SpoutStats. (window-set-converter (:acked stats) str)
-      (window-set-converter (:failed stats) str)
-      (window-set-converter (:complete-latencies stats) str))))
-
-(defn thriftify-executor-stats
-  [stats]
-  (let [specific-stats (thriftify-specific-stats stats)
-        rate (:rate stats)]
-    (ExecutorStats. (window-set-converter (:emitted stats) str)
-      (window-set-converter (:transferred stats) str)
-      specific-stats
-      rate)))
-
-(defn valid-number?
-  "Returns true if x is a number that is not NaN or Infinity, false otherwise"
-  [x]
-  (and (number? x)
-       (not (Double/isNaN x))
-       (not (Double/isInfinite x))))
-
-(defn apply-default
-  [f defaulting-fn & args]
-  (apply f (map defaulting-fn args)))
-
-(defn apply-or-0
-  [f & args]
-  (apply apply-default
-         f
-         #(if (valid-number? %) % 0)
-         args))
-
-(defn sum-or-0
-  [& args]
-  (apply apply-or-0 + args))
-
-(defn product-or-0
-  [& args]
-  (apply apply-or-0 * args))
-
-(defn max-or-0
-  [& args]
-  (apply apply-or-0 max args))
-
-(defn- agg-bolt-lat-and-count
-  "Aggregates number executed, process latency, and execute latency across all
-  streams."
-  [idk->exec-avg idk->proc-avg idk->num-executed]
-  (letfn [(weight-avg [[id avg]]
-            (let [num-e (get idk->num-executed id)]
-              (product-or-0 avg num-e)))]
-    {:executeLatencyTotal (reduce + (map weight-avg idk->exec-avg))
-     :processLatencyTotal (reduce + (map weight-avg idk->proc-avg))
-     :executed (reduce + (vals idk->num-executed))}))
-
-(defn- agg-spout-lat-and-count
-  "Aggregates number acked and complete latencies across all streams."
-  [sid->comp-avg sid->num-acked]
-  (letfn [(weight-avg [[id avg]]
-            (product-or-0 avg (get sid->num-acked id)))]
-    {:completeLatencyTotal (reduce + (map weight-avg sid->comp-avg))
-     :acked (reduce + (vals sid->num-acked))}))
-
-(defn add-pairs
-  ([] [0 0])
-  ([[a1 a2] [b1 b2]]
-   [(+ a1 b1) (+ a2 b2)]))
-
-(defn mk-include-sys-fn
-  [include-sys?]
-  (if include-sys?
-    (fn [_] true)
-    (fn [stream] (and (string? stream) (not (Utils/isSystemId stream))))))
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn mk-include-sys-filter
-  "Returns a function that includes or excludes map entries whose keys are
-  system ids."
-  [include-sys?]
-  (if include-sys?
-    identity
-    (partial filter-key (mk-include-sys-fn false))))
-
-(defn- agg-bolt-streams-lat-and-count
-  "Aggregates number executed and process & execute latencies."
-  [idk->exec-avg idk->proc-avg idk->executed]
-  (letfn [(weight-avg [id avg]
-            (let [num-e (idk->executed id)]
-              (product-or-0 avg num-e)))]
-    (into {}
-      (for [k (keys idk->exec-avg)]
-        [k {:executeLatencyTotal (weight-avg k (get idk->exec-avg k))
-            :processLatencyTotal (weight-avg k (get idk->proc-avg k))
-            :executed (idk->executed k)}]))))
-
-(defn- agg-spout-streams-lat-and-count
-  "Aggregates number acked and complete latencies."
-  [idk->comp-avg idk->acked]
-  (letfn [(weight-avg [id avg]
-            (let [num-e (get idk->acked id)]
-              (product-or-0 avg num-e)))]
-    (into {}
-      (for [k (keys idk->comp-avg)]
-        [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
-            :acked (get idk->acked k)}]))))
-
-(defn swap-map-order
-  "For a nested map, rearrange data such that the top-level keys become the
-  nested map's keys and vice versa.
-  Example:
-  {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
-  -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
-  [m]
-  (apply merge-with
-         merge
-         (map (fn [[k v]]
-                (into {}
-                      (for [[k2 v2] v]
-                        [k2 {k v2}])))
-              m)))
-
-(defn- compute-agg-capacity
-  "Computes the capacity metric for one executor given its heartbeat data and
-  uptime."
-  [m uptime]
-  (when uptime
-    (->>
-      ;; For each stream, create weighted averages and counts.
-      (merge-with (fn weighted-avg+count-fn
-                    [avg cnt]
-                    [(* avg cnt) cnt])
-                  (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
-                  (get (:executed m) (str TEN-MIN-IN-SECONDS)))
-      vals ;; Ignore the stream ids.
-      (reduce add-pairs
-              [0. 0]) ;; Combine weighted averages and counts.
-      ((fn [[weighted-avg cnt]]
-        (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))
-
-(defn agg-pre-merge-comp-page-bolt
-  [{exec-id :exec-id
-    host :host
-    port :port
-    uptime :uptime
-    comp-id :comp-id
-    num-tasks :num-tasks
-    statk->w->sid->num :stats}
-   window
-   include-sys?]
-  ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-  (let [str-key (partial map-key str)
-        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
-    {:executor-id exec-id,
-     :host host,
-     :port port,
-     :uptime uptime,
-     :num-executors 1,
-     :num-tasks num-tasks,
-     :capacity (compute-agg-capacity statk->w->sid->num uptime)
-     :cid+sid->input-stats
-     (merge-with
-       merge
-       (swap-map-order
-         {:acked (-> statk->w->sid->num
-                     :acked
-                     str-key
-                     (get window))
-          :failed (-> statk->w->sid->num
-                      :failed
-                      str-key
-                      (get window))})
-       (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
-                                           :execute-latencies
-                                           str-key
-                                           (get window))
-                                       (-> statk->w->sid->num
-                                           :process-latencies
-                                           str-key
-                                           (get window))
-                                       (-> statk->w->sid->num
-                                           :executed
-                                           str-key
-                                           (get window)))),
-     :sid->output-stats
-     (swap-map-order
-       {:emitted (-> statk->w->sid->num
-                     :emitted
-                     str-key
-                     (get window)
-                     handle-sys-components-fn)
-        :transferred (-> statk->w->sid->num
-                         :transferred
-                         str-key
-                         (get window)
-                         handle-sys-components-fn)})}))
-
-(defn agg-pre-merge-comp-page-spout
-  [{exec-id :exec-id
-    host :host
-    port :port
-    uptime :uptime
-    comp-id :comp-id
-    num-tasks :num-tasks
-    statk->w->sid->num :stats}
-   window
-   include-sys?]
-  ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-  (let [str-key (partial map-key str)
-        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
-    {:executor-id exec-id,
-     :host host,
-     :port port,
-     :uptime uptime,
-     :num-executors 1,
-     :num-tasks num-tasks,
-     :sid->output-stats
-     (merge-with
-       merge
-       (agg-spout-streams-lat-and-count (-> statk->w->sid->num
-                                            :complete-latencies
-                                            str-key
-                                            (get window))
-                                        (-> statk->w->sid->num
-                                            :acked
-                                            str-key
-                                            (get window)))
-       (swap-map-order
-         {:acked (-> statk->w->sid->num
-                     :acked
-                     str-key
-                     (get window))
-          :failed (-> statk->w->sid->num
-                      :failed
-                      str-key
-                      (get window))
-          :emitted (-> statk->w->sid->num
-                       :emitted
-                       str-key
-                       (get window)
-                       handle-sys-components-fn)
-          :transferred (-> statk->w->sid->num
-                           :transferred
-                           str-key
-                           (get window)
-                           handle-sys-components-fn)}))}))
-
-(defn agg-pre-merge-topo-page-bolt
-  [{comp-id :comp-id
-    num-tasks :num-tasks
-    statk->w->sid->num :stats
-    uptime :uptime}
-   window
-   include-sys?]
-  ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-  (let [str-key (partial map-key str)
-        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
-    {comp-id
-     (merge
-       (agg-bolt-lat-and-count (-> statk->w->sid->num
-                                   :execute-latencies
-                                   str-key
-                                   (get window))
-                               (-> statk->w->sid->num
-                                   :process-latencies
-                                   str-key
-                                   (get window))
-                               (-> statk->w->sid->num
-                                   :executed
-                                   str-key
-                                   (get window)))
-       {:num-executors 1
-        :num-tasks num-tasks
-        :emitted (-> statk->w->sid->num
-                     :emitted
-                     str-key
-                     (get window)
-                     handle-sys-components-fn
-                     vals
-                     (#(reduce + %)))
-        :transferred (-> statk->w->sid->num
-                         :transferred
-                         str-key
-                         (get window)
-                         handle-sys-components-fn
-                         vals
-                         (#(reduce + %)))
-        :capacity (compute-agg-capacity statk->w->sid->num uptime)
-        :acked (-> statk->w->sid->num
-                   :acked
-                   str-key
-                   (get window)
-                   vals
-                   (#(reduce + %)))
-        :failed (-> statk->w->sid->num
-                    :failed
-                    str-key
-                    (get window)
-                    vals
-                    (#(reduce + %)))})}))
-
-(defn agg-pre-merge-topo-page-spout
-  [{comp-id :comp-id
-    num-tasks :num-tasks
-    statk->w->sid->num :stats}
-   window
-   include-sys?]
-  ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-  (let [str-key (partial map-key str)
-        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
-    {comp-id
-     (merge
-       (agg-spout-lat-and-count (-> statk->w->sid->num
-                                    :complete-latencies
-                                    str-key
-                                    (get window))
-                                (-> statk->w->sid->num
-                                    :acked
-                                    str-key
-                                    (get window)))
-       {:num-executors 1
-        :num-tasks num-tasks
-        :emitted (-> statk->w->sid->num
-                     :emitted
-                     str-key
-                     (get window)
-                     handle-sys-components-fn
-                     vals
-                     (#(reduce + %)))
-        :transferred (-> statk->w->sid->num
-                         :transferred
-                         str-key
-                         (get window)
-                         handle-sys-components-fn
-                         vals
-                         (#(reduce + %)))
-        :failed (-> statk->w->sid->num
-                    :failed
-                    str-key
-                    (get window)
-                    vals
-                    (#(reduce + %)))})}))
-
-(defn merge-agg-comp-stats-comp-page-bolt
-  [{acc-in :cid+sid->input-stats
-    acc-out :sid->output-stats
-    :as acc-bolt-stats}
-   {bolt-in :cid+sid->input-stats
-    bolt-out :sid->output-stats
-    :as bolt-stats}]
-  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
-   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats)),
-   :sid->output-stats (merge-with (partial merge-with sum-or-0)
-                                  acc-out
-                                  bolt-out),
-   :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
-                                     acc-in
-                                     bolt-in),
-   :executor-stats
-   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
-         executed (sum-streams bolt-in :executed)]
-     (conj (:executor-stats acc-bolt-stats)
-           (merge
-             (select-keys bolt-stats
-                          [:executor-id :uptime :host :port :capacity])
-             {:emitted (sum-streams bolt-out :emitted)
-              :transferred (sum-streams bolt-out :transferred)
-              :acked (sum-streams bolt-in :acked)
-              :failed (sum-streams bolt-in :failed)
-              :executed executed}
-             (->>
-               (if (and executed (pos? executed))
-                 [(div (sum-streams bolt-in :executeLatencyTotal) executed)
-                  (div (sum-streams bolt-in :processLatencyTotal) executed)]
-                 [nil nil])
-               (mapcat vector [:execute-latency :process-latency])
-               (apply assoc {})))))})
-
-(defn merge-agg-comp-stats-comp-page-spout
-  [{acc-out :sid->output-stats
-    :as acc-spout-stats}
-   {spout-out :sid->output-stats
-    :as spout-stats}]
-  {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
-   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats)),
-   :sid->output-stats (merge-with (partial merge-with sum-or-0)
-                                  acc-out
-                                  spout-out),
-   :executor-stats
-   (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
-         acked (sum-streams spout-out :acked)]
-     (conj (:executor-stats acc-spout-stats)
-           (merge
-             (select-keys spout-stats [:executor-id :uptime :host :port])
-             {:emitted (sum-streams spout-out :emitted)
-              :transferred (sum-streams spout-out :transferred)
-              :acked acked
-              :failed (sum-streams spout-out :failed)}
-             {:complete-latency (if (and acked (pos? acked))
-                                  (div (sum-streams spout-out
-                                                    :completeLatencyTotal)
-                                       acked)
-                                  nil)})))})
-
-(defn merge-agg-comp-stats-topo-page-bolt
-  [acc-bolt-stats bolt-stats]
-  {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
-   :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats))
-   :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
-   :transferred (sum-or-0 (:transferred acc-bolt-stats)
-                          (:transferred bolt-stats))
-   :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
-   ;; We sum average latency totals here to avoid dividing at each step.
-   ;; Compute the average latencies by dividing the total by the count.
-   :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
-                                  (:executeLatencyTotal bolt-stats))
-   :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
-                                  (:processLatencyTotal bolt-stats))
-   :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
-   :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
-   :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
-
-(defn merge-agg-comp-stats-topo-page-spout
-  [acc-spout-stats spout-stats]
-  {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
-   :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
-   :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
-   :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred spout-stats))
-   ;; We sum average latency totals here to avoid dividing at each step.
-   ;; Compute the average latencies by dividing the total by the count.
-   :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
-                            (:completeLatencyTotal spout-stats))
-   :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
-   :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn aggregate-count-streams
-  [stats]
-  (->> stats
-    (map-val #(reduce + (vals %)))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- agg-topo-exec-stats*
-  "A helper function that does the common work to aggregate stats of one
-  executor with the given map for the topology page."
-  [window
-   include-sys?
-   {:keys [workers-set
-           bolt-id->stats
-           spout-id->stats
-           window->emitted
-           window->transferred
-           window->comp-lat-wgt-avg
-           window->acked
-           window->failed] :as acc-stats}
-   {:keys [stats] :as new-data}
-   pre-merge-fn
-   merge-fn
-   comp-key]
-  (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
-        {w->compLatWgtAvg :completeLatencyTotal
-         w->acked :acked}
-          (if (:complete-latencies stats)
-            (swap-map-order
-              (into {}
-                    (for [w (keys (:acked stats))]
-                         [w (agg-spout-lat-and-count
-                              (get (:complete-latencies stats) w)
-                              (get (:acked stats) w))])))
-            {:completeLatencyTotal nil
-             :acks (aggregate-count-streams (:acked stats))})
-        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
-    (assoc {:workers-set (conj workers-set
-                               [(:host new-data) (:port new-data)])
-            :bolt-id->stats bolt-id->stats
-            :spout-id->stats spout-id->stats
-            :window->emitted (->> (:emitted stats)
-                                  (map-val handle-sys-components-fn)
-                                  aggregate-count-streams
-                                  (merge-with + window->emitted))
-            :window->transferred (->> (:transferred stats)
-                                      (map-val handle-sys-components-fn)
-                                      aggregate-count-streams
-                                      (merge-with + window->transferred))
-            :window->comp-lat-wgt-avg (merge-with +
-                                                  window->comp-lat-wgt-avg
-                                                  w->compLatWgtAvg)
-            :window->acked (if (= :spout (:type stats))
-                             (merge-with + window->acked w->acked)
-                             window->acked)
-            :window->failed (if (= :spout (:type stats))
-                              (->> (:failed stats)
-                                   aggregate-count-streams
-                                   (merge-with + window->failed))
-                              window->failed)}
-           comp-key (merge-with merge-fn
-                                (acc-stats comp-key)
-                                cid->statk->num)
-           :type (:type stats))))
-
-(defmulti agg-topo-exec-stats
-  "Combines the aggregate stats of one executor with the given map, selecting
-  the appropriate window and including system components as specified."
-  (fn dispatch-fn [& args] (:type (last args))))
-
-(defmethod agg-topo-exec-stats :bolt
-  [window include-sys? acc-stats new-data]
-  (agg-topo-exec-stats* window
-                        include-sys?
-                        acc-stats
-                        new-data
-                        agg-pre-merge-topo-page-bolt
-                        merge-agg-comp-stats-topo-page-bolt
-                        :bolt-id->stats))
-
-(defmethod agg-topo-exec-stats :spout
-  [window include-sys? acc-stats new-data]
-  (agg-topo-exec-stats* window
-                        include-sys?
-                        acc-stats
-                        new-data
-                        agg-pre-merge-topo-page-spout
-                        merge-agg-comp-stats-topo-page-spout
-                        :spout-id->stats))
-
-(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
-
-(defn get-last-error
-  [storm-cluster-state storm-id component-id]
-  (if-let [e (.last-error storm-cluster-state storm-id component-id)]
-    (ErrorInfo. (:error e) (:time-secs e))))
-
-(defn component-type
-  "Returns the component type (either :bolt or :spout) for a given
-  topology and component id. Returns nil if not found."
-  [^StormTopology topology id]
-  (let [bolts (.get_bolts topology)
-        spouts (.get_spouts topology)]
-    (cond
-      (Utils/isSystemId id) :bolt
-      (.containsKey bolts id) :bolt
-      (.containsKey spouts id) :spout)))
-
-(defn extract-nodeinfos-from-hb-for-comp
-  ([exec->host+port task->component include-sys? comp-id]
-   (distinct (for [[[start end :as executor] [host port]] exec->host+port
-         :let [id (task->component start)]
-         :when (and (or (nil? comp-id) (= comp-id id))
-                 (or include-sys? (not (Utils/isSystemId id))))]
-     {:host host
-      :port port}))))
-
-(defn extract-data-from-hb
-  ([exec->host+port task->component beats include-sys? topology comp-id]
-   (for [[[start end :as executor] [host port]] exec->host+port
-         :let [beat (beats executor)
-               id (task->component start)]
-         :when (and (or (nil? comp-id) (= comp-id id))
-                    (or include-sys? (not (Utils/isSystemId id))))]
-     {:exec-id executor
-      :comp-id id
-      :num-tasks (count (range start (inc end)))
-      :host host
-      :port port
-      :uptime (:uptime beat)
-      :stats (:stats beat)
-      :type (or (:type (:stats beat))
-                (component-type topology id))}))
-  ([exec->host+port task->component beats include-sys? topology]
-    (extract-data-from-hb exec->host+port
-                          task->component
-                          beats
-                          include-sys?
-                          topology
-                          nil)))
-
-(defn aggregate-topo-stats
-  [window include-sys? data]
-  (let [init-val {:workers-set #{}
-                  :bolt-id->stats {}
-                  :spout-id->stats {}
-                  :window->emitted {}
-                  :window->transferred {}
-                  :window->comp-lat-wgt-avg {}
-                  :window->acked {}
-                  :window->failed {}}
-        reducer-fn (partial agg-topo-exec-stats
-                            window
-                            include-sys?)]
-    (reduce reducer-fn init-val data)))
-
-(defn- compute-weighted-averages-per-window
-  [acc-data wgt-avg-key divisor-key]
-  (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
-                 :let [divisor ((divisor-key acc-data) window)]
-                 :when (and divisor (pos? divisor))]
-             [(str window) (div wgt-avg divisor)])))
-
-(defn- post-aggregate-topo-stats
-  [task->component exec->node+port last-err-fn acc-data]
-  {:num-tasks (count task->component)
-   :num-workers (count (:workers-set acc-data))
-   :num-executors (count exec->node+port)
-   :bolt-id->stats
-     (into {} (for [[id m] (:bolt-id->stats acc-data)
-                    :let [executed (:executed m)]]
-                     [id (-> m
-                             (assoc :execute-latency
-                                    (if (and executed (pos? executed))
-                                      (div (or (:executeLatencyTotal m) 0)
-                                           executed)
-                                      0)
-                                    :process-latency
-                                    (if (and executed (pos? executed))
-                                      (div (or (:processLatencyTotal m) 0)
-                                           executed)
-                                      0))
-                             (dissoc :executeLatencyTotal
-                                     :processLatencyTotal)
-                             (assoc :lastError (last-err-fn id)))]))
-   :spout-id->stats
-     (into {} (for [[id m] (:spout-id->stats acc-data)
-                    :let [acked (:acked m)]]
-                    [id (-> m
-                            (assoc :complete-latency
-                                   (if (and acked (pos? acked))
-                                     (div (:completeLatencyTotal m)
-                                          (:acked m))
-                                     0))
-                            (dissoc :completeLatencyTotal)
-                            (assoc :lastError (last-err-fn id)))]))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->emitted (map-key str (:window->emitted acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->transferred (map-key str (:window->transferred acc-data))
-   :window->complete-latency
-     (compute-weighted-averages-per-window acc-data
-                                           :window->comp-lat-wgt-avg
-                                           :window->acked)
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->acked (map-key str (:window->acked acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->failed (map-key str (:window->failed acc-data))})
-
-(defn- thriftify-common-agg-stats
-  [^ComponentAggregateStats s
-   {:keys [num-tasks
-           emitted
-           transferred
-           acked
-           failed
-           num-executors] :as statk->num}]
-  (let [cas (CommonAggregateStats.)]
-    (and num-executors (.set_num_executors cas num-executors))
-    (and num-tasks (.set_num_tasks cas num-tasks))
-    (and emitted (.set_emitted cas emitted))
-    (and transferred (.set_transferred cas transferred))
-    (and acked (.set_acked cas acked))
-    (and failed (.set_failed cas failed))
-    (.set_common_stats s cas)))
-
-(defn thriftify-bolt-agg-stats
-  [statk->num]
-  (let [{:keys [lastError
-                execute-latency
-                process-latency
-                executed
-                capacity]} statk->num
-        s (ComponentAggregateStats.)]
-    (.set_type s ComponentType/BOLT)
-    (and lastError (.set_last_error s lastError))
-    (thriftify-common-agg-stats s statk->num)
-    (.set_specific_stats s
-      (SpecificAggregateStats/bolt
-        (let [bas (BoltAggregateStats.)]
-          (and execute-latency (.set_execute_latency_ms bas execute-latency))
-          (and process-latency (.set_process_latency_ms bas process-latency))
-          (and executed (.set_executed bas executed))
-          (and capacity (.set_capacity bas capacity))
-          bas)))
-    s))
-
-(defn thriftify-spout-agg-stats
-  [statk->num]
-  (let [{:keys [lastError
-                complete-latency]} statk->num
-        s (ComponentAggregateStats.)]
-    (.set_type s ComponentType/SPOUT)
-    (and lastError (.set_last_error s lastError))
-    (thriftify-common-agg-stats s statk->num)
-    (.set_specific_stats s
-      (SpecificAggregateStats/spout
-        (let [sas (SpoutAggregateStats.)]
-          (and complete-latency (.set_complete_latency_ms sas complete-latency))
-          sas)))
-    s))
-
-(defn thriftify-topo-page-data
-  [topology-id data]
-  (let [{:keys [num-tasks
-                num-workers
-                num-executors
-                spout-id->stats
-                bolt-id->stats
-                window->emitted
-                window->transferred
-                window->complete-latency
-                window->acked
-                window->failed]} data
-        spout-agg-stats (into {}
-                              (for [[id m] spout-id->stats
-                                    :let [m (assoc m :type :spout)]]
-                                [id
-                                 (thriftify-spout-agg-stats m)]))
-        bolt-agg-stats (into {}
-                             (for [[id m] bolt-id->stats
-                                   :let [m (assoc m :type :bolt)]]
-                              [id
-                               (thriftify-bolt-agg-stats m)]))
-        topology-stats (doto (TopologyStats.)
-                         (.set_window_to_emitted window->emitted)
-                         (.set_window_to_transferred window->transferred)
-                         (.set_window_to_complete_latencies_ms
-                           window->complete-latency)
-                         (.set_window_to_acked window->acked)
-                         (.set_window_to_failed window->failed))
-      topo-page-info (doto (TopologyPageInfo. topology-id)
-                       (.set_num_tasks num-tasks)
-                       (.set_num_workers num-workers)
-                       (.set_num_executors num-executors)
-                       (.set_id_to_spout_agg_stats spout-agg-stats)
-                       (.set_id_to_bolt_agg_stats bolt-agg-stats)
-                       (.set_topology_stats topology-stats))]
-    topo-page-info))
-
-(defn agg-topo-execs-stats
-  "Aggregate various executor statistics for a topology from the given
-  heartbeats."
-  [topology-id
-   exec->node+port
-   task->component
-   beats
-   topology
-   window
-   include-sys?
-   last-err-fn]
-  (->> ;; This iterates over each executor one time, because of lazy evaluation.
-    (extract-data-from-hb exec->node+port
-                          task->component
-                          beats
-                          include-sys?
-                          topology)
-    (aggregate-topo-stats window include-sys?)
-    (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
-    (thriftify-topo-page-data topology-id)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- agg-bolt-exec-win-stats
-  "A helper function that aggregates windowed stats from one bolt executor."
-  [acc-stats new-stats include-sys?]
-  (let [{w->execLatWgtAvg :executeLatencyTotal
-         w->procLatWgtAvg :processLatencyTotal
-         w->executed :executed}
-          (swap-map-order
-            (into {} (for [w (keys (:executed new-stats))]
-                       [w (agg-bolt-lat-and-count
-                            (get (:execute-latencies new-stats) w)
-                            (get (:process-latencies new-stats) w)
-                            (get (:executed new-stats) w))])))
-        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
-    {:window->emitted (->> (:emitted new-stats)
-                           (map-val handle-sys-components-fn)
-                           aggregate-count-streams
-                           (merge-with + (:window->emitted acc-stats)))
-     :window->transferred (->> (:transferred new-stats)
-                               (map-val handle-sys-components-fn)
-                               aggregate-count-streams
-                               (merge-with + (:window->transferred acc-stats)))
-     :window->exec-lat-wgt-avg (merge-with +
-                                           (:window->exec-lat-wgt-avg acc-stats)
-                                           w->execLatWgtAvg)
-     :window->proc-lat-wgt-avg (merge-with +
-                                           (:window->proc-lat-wgt-avg acc-stats)
-                                           w->procLatWgtAvg)
-     :window->executed (merge-with + (:window->executed acc-stats) w->executed)
-     :window->acked (->> (:acked new-stats)
-                         aggregate-count-streams
-                         (merge-with + (:window->acked acc-stats)))
-     :window->failed (->> (:failed new-stats)
-                          aggregate-count-streams
-                          (merge-with + (:window->failed acc-stats)))}))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- agg-spout-exec-win-stats
-  "A helper function that aggregates windowed stats from one spout executor."
-  [acc-stats new-stats include-sys?]
-  (let [{w->compLatWgtAvg :completeLatencyTotal
-         w->acked :acked}
-          (swap-map-order
-            (into {} (for [w (keys (:acked new-stats))]
-                       [w (agg-spout-lat-and-count
-                            (get (:complete-latencies new-stats) w)
-                            (get (:acked new-stats) w))])))
-        handle-sys-components-fn (mk-include-sys-filter include-sys?)]
-    {:window->emitted (->> (:emitted new-stats)
-                           (map-val handle-sys-components-fn)
-                           aggregate-count-streams
-                           (merge-with + (:window->emitted acc-stats)))
-     :window->transferred (->> (:transferred new-stats)
-                               (map-val handle-sys-components-fn)
-                               aggregate-count-streams
-                               (merge-with + (:window->transferred acc-stats)))
-     :window->comp-lat-wgt-avg (merge-with +
-                                           (:window->comp-lat-wgt-avg acc-stats)
-                                           w->compLatWgtAvg)
-     :window->acked (->> (:acked new-stats)
-                         aggregate-count-streams
-                         (merge-with + (:window->acked acc-stats)))
-     :window->failed (->> (:failed new-stats)
-                          aggregate-count-streams
-                          (merge-with + (:window->failed acc-stats)))}))
-
-(defmulti agg-comp-exec-stats
-  "Combines the aggregate stats of one executor with the given map, selecting
-  the appropriate window and including system components as specified."
-  (fn dispatch-fn [_ _ init-val _] (:type init-val)))
-
-(defmethod agg-comp-exec-stats :bolt
-  [window include-sys? acc-stats new-data]
-  (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
-         :stats (merge-agg-comp-stats-comp-page-bolt
-                  (:stats acc-stats)
-                  (agg-pre-merge-comp-page-bolt new-data window include-sys?))
-         :type :bolt))
-
-(defmethod agg-comp-exec-stats :spout
-  [window include-sys? acc-stats new-data]
-  (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?)
-         :stats (merge-agg-comp-stats-comp-page-spout
-                  (:stats acc-stats)
-                  (agg-pre-merge-comp-page-spout new-data window include-sys?))
-         :type :spout))
-
-(defn- aggregate-comp-stats*
-  [window include-sys? data init-val]
-  (-> (partial agg-comp-exec-stats
-               window
-               include-sys?)
-      (reduce init-val data)))
-
-(defmulti aggregate-comp-stats
-  (fn dispatch-fn [& args] (-> args last first :type)))
-
-(defmethod aggregate-comp-stats :bolt
-  [& args]
-  (let [init-val {:type :bolt
-                  :cid+sid->input-stats {}
-                  :sid->output-stats {}
-                  :executor-stats []
-                  :window->emitted {}
-                  :window->transferred {}
-                  :window->exec-lat-wgt-avg {}
-                  :window->executed {}
-                  :window->proc-lat-wgt-avg {}
-                  :window->acked {}
-                  :window->failed {}}]
-    (apply aggregate-comp-stats* (concat args (list init-val)))))
-
-(defmethod aggregate-comp-stats :spout
-  [& args]
-  (let [init-val {:type :spout
-                  :sid->output-stats {}
-                  :executor-stats []
-                  :window->emitted {}
-                  :window->transferred {}
-                  :window->comp-lat-wgt-avg {}
-                  :window->acked {}
-                  :window->failed {}}]
-    (apply aggregate-comp-stats* (concat args (list init-val)))))
-
-(defmethod aggregate-comp-stats :default [& _] {})
-
-(defmulti post-aggregate-comp-stats
-  (fn [_ _ data] (:type data)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defmethod post-aggregate-comp-stats :bolt
-  [task->component
-   exec->host+port
-   {{i-stats :cid+sid->input-stats
-     o-stats :sid->output-stats
-     num-tasks :num-tasks
-     num-executors :num-executors} :stats
-    comp-type :type :as acc-data}]
-  {:type comp-type
-   :num-tasks num-tasks
-   :num-executors num-executors
-   :cid+sid->input-stats
-   (->> i-stats
-        (map-val (fn [m]
-                     (let [executed (:executed m)
-                           lats (if (and executed (pos? executed))
-                                  {:execute-latency
-                                   (div (or (:executeLatencyTotal m) 0)
-                                        executed)
-                                   :process-latency
-                                   (div (or (:processLatencyTotal m) 0)
-                                        executed)}
-                                  {:execute-latency 0
-                                   :process-latency 0})]
-                       (-> m (merge lats) (dissoc :executeLatencyTotal
-                                                  :processLatencyTotal))))))
-   :sid->output-stats o-stats
-   :executor-stats (:executor-stats (:stats acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->emitted (map-key str (:window->emitted acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->transferred (map-key str (:window->transferred acc-data))
-   :window->execute-latency
-     (compute-weighted-averages-per-window acc-data
-                                           :window->exec-lat-wgt-avg
-                                           :window->executed)
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->executed (map-key str (:window->executed acc-data))
-   :window->process-latency
-     (compute-weighted-averages-per-window acc-data
-                                           :window->proc-lat-wgt-avg
-                                           :window->executed)
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->acked (map-key str (:window->acked acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->failed (map-key str (:window->failed acc-data))})
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defmethod post-aggregate-comp-stats :spout
-  [task->component
-   exec->host+port
-   {{o-stats :sid->output-stats
-     num-tasks :num-tasks
-     num-executors :num-executors} :stats
-    comp-type :type :as acc-data}]
-  {:type comp-type
-   :num-tasks num-tasks
-   :num-executors num-executors
-   :sid->output-stats
-   (->> o-stats
-        (map-val (fn [m]
-                     (let [acked (:acked m)
-                           lat (if (and acked (pos? acked))
-                                 {:complete-latency
-                                  (div (or (:completeLatencyTotal m) 0) acked)}
-                                 {:complete-latency 0})]
-                       (-> m (merge lat) (dissoc :completeLatencyTotal))))))
-   :executor-stats (:executor-stats (:stats acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->emitted (map-key str (:window->emitted acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->transferred (map-key str (:window->transferred acc-data))
-   :window->complete-latency
-     (compute-weighted-averages-per-window acc-data
-                                           :window->comp-lat-wgt-avg
-                                           :window->acked)
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->acked (map-key str (:window->acked acc-data))
-   ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-   :window->failed (map-key str (:window->failed acc-data))})
-
-(defmethod post-aggregate-comp-stats :default [& _] {})
-
-(defn thriftify-exec-agg-stats
-  [comp-id comp-type {:keys [executor-id host port uptime] :as stats}]
-  (doto (ExecutorAggregateStats.)
-    (.set_exec_summary (ExecutorSummary. (apply #(ExecutorInfo. %1 %2)
-                                                executor-id)
-                                         comp-id
-                                         host
-                                         port
-                                         (or uptime 0)))
-    (.set_stats ((condp = comp-type
-                   :bolt thriftify-bolt-agg-stats
-                   :spout thriftify-spout-agg-stats) stats))))
-
-(defn- thriftify-bolt-input-stats
-  [cid+sid->input-stats]
-  (into {} (for [[cid+sid input-stats] cid+sid->input-stats]
-             [(to-global-stream-id cid+sid)
-              (thriftify-bolt-agg-stats input-stats)])))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- thriftify-bolt-output-stats
-  [sid->output-stats]
-  (map-val thriftify-bolt-agg-stats sid->output-stats))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- thriftify-spout-output-stats
-  [sid->output-stats]
-  (map-val thriftify-spout-agg-stats sid->output-stats))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-comp-page-data
-  [topo-id topology comp-id data]
-  (let [w->stats (swap-map-order
-                   (merge
-                     {:emitted (:window->emitted data)
-                      :transferred (:window->transferred data)
-                      :acked (:window->acked data)
-                      :failed (:window->failed data)}
-                     (condp = (:type data)
-                       :bolt {:execute-latency (:window->execute-latency data)
-                              :process-latency (:window->process-latency data)
-                              :executed (:window->executed data)}
-                       :spout {:complete-latency
-                               (:window->complete-latency data)}
-                       {}))) ; default
-        [compType exec-stats w->stats gsid->input-stats sid->output-stats]
-          (condp = (component-type topology comp-id)
-            :bolt [ComponentType/BOLT
-                   (->
-                     (partial thriftify-exec-agg-stats comp-id :bolt)
-                     (map (:executor-stats data)))
-                   (map-val thriftify-bolt-agg-stats w->stats)
-                   (thriftify-bolt-input-stats (:cid+sid->input-stats data))
-                   (thriftify-bolt-output-stats (:sid->output-stats data))]
-            :spout [ComponentType/SPOUT
-                    (->
-                      (partial thriftify-exec-agg-stats comp-id :spout)
-                      (map (:executor-stats data)))
-                    (map-val thriftify-spout-agg-stats w->stats)
-                    nil ;; spouts do not have input stats
-                    (thriftify-spout-output-stats (:sid->output-stats data))]),
-        num-executors (:num-executors data)
-        num-tasks (:num-tasks data)
-        ret (doto (ComponentPageInfo. comp-id compType)
-              (.set_topology_id topo-id)
-              (.set_topology_name nil)
-              (.set_window_to_stats w->stats)
-              (.set_sid_to_output_stats sid->output-stats)
-              (.set_exec_stats exec-stats))]
-    (and num-executors (.set_num_executors ret num-executors))
-    (and num-tasks (.set_num_tasks ret num-tasks))
-    (and gsid->input-stats
-         (.set_gsid_to_input_stats ret gsid->input-stats))
-    ret))
-
-(defn agg-comp-execs-stats
-  "Aggregate various executor statistics for a component from the given
-  heartbeats."
-  [exec->host+port
-   task->component
-   beats
-   window
-   include-sys?
-   topology-id
-   topology
-   component-id]
-  (->> ;; This iterates over each executor one time, because of lazy evaluation.
-    (extract-data-from-hb exec->host+port
-                          task->component
-                          beats
-                          include-sys?
-                          topology
-                          component-id)
-    (aggregate-comp-stats window include-sys?)
-    (post-aggregate-comp-stats task->component exec->host+port)
-    (thriftify-comp-page-data topology-id topology component-id)))
-
-(defn expand-averages
-  [avg counts]
-  (let [avg (clojurify-structure avg)
-        counts (clojurify-structure counts)]
-    (into {}
-          (for [[slice streams] counts]
-            [slice
-             (into {}
-                   (for [[stream c] streams]
-                     [stream
-                      [(* c (get-in avg [slice stream]))
-                       c]]
-                     ))]))))
-
-(defn expand-averages-seq
-  [average-seq counts-seq]
-  (->> (map vector average-seq counts-seq)
-       (map #(apply expand-averages %))
-       (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))
-
-(defn- val-avg
-  [[t c]]
-  (if (= c 0) 0
-    (double (/ t c))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn aggregate-averages
-  [average-seq counts-seq]
-  (->> (expand-averages-seq average-seq counts-seq)
-       (map-val
-         (fn [s]
-           (map-val val-avg s)))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn aggregate-avg-streams
-  [avg counts]
-  (let [expanded (expand-averages avg counts)]
-    (->> expanded
-         (map-val #(reduce add-pairs (vals %)))
-         (map-val val-avg))))
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn pre-process
-  [stream-summary include-sys?]
-  (let [filter-fn (mk-include-sys-fn include-sys?)
-        emitted (:emitted stream-summary)
-        emitted (into {} (for [[window stat] emitted]
-                           {window (filter-key filter-fn stat)}))
-        transferred (:transferred stream-summary)
-        transferred (into {} (for [[window stat] transferred]
-                               {window (filter-key filter-fn stat)}))
-        stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
-        stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
-    stream-summary))
-
-(defn aggregate-counts
-  [counts-seq]
-  (->> counts-seq
-       (map clojurify-structure)
-       (apply merge-with
-              (fn [s1 s2]
-                (merge-with + s1 s2)))))
-
-(defn aggregate-common-stats
-  [stats-seq]
-  {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
-   :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))})
-
-(defn- collectify
-  [obj]
-  (if (or (sequential? obj) (instance? Collection obj))
-    obj
-    [obj]))
-
-(defn aggregate-bolt-stats
-  [stats-seq include-sys?]
-  (let [stats-seq (collectify stats-seq)]
-    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
-           {:acked
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
-                                   stats-seq))
-            :failed
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
-                                   stats-seq))
-            :executed
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
-                                   stats-seq))
-            :process-latencies
-            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
-                                     stats-seq)
-                                (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
-                                     stats-seq))
-            :execute-latencies
-            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
-                                     stats-seq)
-                                (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
-                                     stats-seq))})))
-
-(defn aggregate-spout-stats
-  [stats-seq include-sys?]
-  (let [stats-seq (collectify stats-seq)]
-    (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
-           {:acked
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
-                                   stats-seq))
-            :failed
-            (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed)
-                                   stats-seq))
-            :complete-latencies
-            (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg)
-                                     stats-seq)
-                                (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
-                                     stats-seq))})))
-
-(defn get-filled-stats
-  [summs]
-  (->> summs
-       (map #(.get_stats ^ExecutorSummary %))
-       (filter not-nil?)))
-
-(defn aggregate-spout-streams
-  [stats]
-  {:acked (aggregate-count-streams (:acked stats))
-   :failed (aggregate-count-streams (:failed stats))
-   :emitted (aggregate-count-streams (:emitted stats))
-   :transferred (aggregate-count-streams (:transferred stats))
-   :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
-                                              (:acked stats))})
-
-(defn spout-streams-stats
-  [summs include-sys?]
-  (let [stats-seq (get-filled-stats summs)]
-    (aggregate-spout-streams
-      (aggregate-spout-stats
-        stats-seq include-sys?))))
-
-(defn aggregate-bolt-streams
-  [stats]
-  {:acked (aggregate-count-streams (:acked stats))
-   :failed (aggregate-count-streams (:failed stats))
-   :emitted (aggregate-count-streams (:emitted stats))
-   :transferred (aggregate-count-streams (:transferred stats))
-   :process-latencies (aggregate-avg-streams (:process-latencies stats)
-                                             (:acked stats))
-   :executed (aggregate-count-streams (:executed stats))
-   :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
-                                             (:executed stats))})
-
-(defn compute-executor-capacity
-  [^ExecutorSummary e]
-  (let [stats (.get_stats e)
-        stats (if stats
-                (-> stats
-                    (aggregate-bolt-stats true)
-                    (aggregate-bolt-streams)
-                    swap-map-order
-                    (get (str TEN-MIN-IN-SECONDS))))
-        uptime (Utils/nullToZero (.get_uptime_secs e))
-        window (if (< uptime TEN-MIN-IN-SECONDS) uptime TEN-MIN-IN-SECONDS)
-        executed (-> stats :executed Utils/nullToZero)
-        latency (-> stats :execute-latencies Utils/nullToZero)]
-    (if (> window 0)
-      (div (* executed latency) (* 1000 window)))))
-
-(defn bolt-streams-stats
-  [summs include-sys?]
-  (let [stats-seq (get-filled-stats summs)]
-    (aggregate-bolt-streams
-      (aggregate-bolt-stats
-        stats-seq include-sys?))))
-
-(defn total-aggregate-stats
-  [spout-summs bolt-summs include-sys?]
-  (let [spout-stats (get-filled-stats spout-summs)
-        bolt-stats (get-filled-stats bolt-summs)
-        agg-spout-stats (-> spout-stats
-                            (aggregate-spout-stats include-sys?)
-                            aggregate-spout-streams)
-        agg-bolt-stats (-> bolt-stats
-                           (aggregate-bolt-stats include-sys?)
-                           aggregate-bolt-streams)]
-    (merge-with
-      (fn [s1 s2]
-        (merge-with + s1 s2))
-      (select-keys
-        agg-bolt-stats
-        ;; Include only keys that will be used.  We want to count acked and
-        ;; failed only for the "tuple trees," so we do not include those keys
-        ;; from the bolt executors.
-        [:emitted :transferred])
-      agg-spout-stats)))
-
-(defn error-subset
-  [error-str]
-  (apply str (take 200 error-str)))
-
-(defn most-recent-error
-  [errors-list]
-  (let [error (->> errors-list
-                   (sort-by #(.get_error_time_secs ^ErrorInfo %))
-                   reverse
-                   first)]
-    (if error
-      (error-subset (.get_error ^ErrorInfo error))
-      "")))
-
-(defn float-str [n]
-  (if n
-    (format "%.3f" (float n))
-    "0"))
-
-(defn compute-bolt-capacity
-  [executors]
-  (->> executors
-       (map compute-executor-capacity)
-       (map #(Utils/nullToZero %))
-       (apply max)))