You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/15 18:44:35 UTC
[02/30] storm git commit: port backtype.storm.stats to java
port backtype.storm.stats to java
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/afd2d525
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/afd2d525
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/afd2d525
Branch: refs/heads/master
Commit: afd2d525be396c6f430e6a4a13cd1f237496a473
Parents: 11232b5
Author: 卫乐 <we...@taobao.com>
Authored: Wed Feb 24 21:06:25 2016 +0800
Committer: 卫乐 <we...@taobao.com>
Committed: Wed Feb 24 21:06:25 2016 +0800
----------------------------------------------------------------------
.../src/clj/org/apache/storm/converter.clj | 25 +-
.../org/apache/storm/daemon/builtin_metrics.clj | 33 +-
.../clj/org/apache/storm/daemon/executor.clj | 23 +-
.../src/clj/org/apache/storm/daemon/nimbus.clj | 18 +-
.../src/clj/org/apache/storm/daemon/task.clj | 11 +-
storm-core/src/clj/org/apache/storm/stats.clj | 1567 ------------------
storm-core/src/clj/org/apache/storm/ui/core.clj | 57 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 8 +-
8 files changed, 84 insertions(+), 1658 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj
index 5599d28..6e9eeb8 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -17,8 +17,9 @@
(:import [org.apache.storm.generated SupervisorInfo NodeInfo Assignment WorkerResources
StormBase TopologyStatus ClusterWorkerHeartbeat ExecutorInfo ErrorInfo Credentials RebalanceOptions KillOptions
TopologyActionOptions DebugOptions ProfileRequest]
- [org.apache.storm.utils Utils])
- (:use [org.apache.storm util stats log])
+ [org.apache.storm.utils Utils]
+ [org.apache.storm.stats StatsUtil])
+ (:use [org.apache.storm util log])
(:require [org.apache.storm.daemon [common :as common]]))
(defn thriftify-supervisor-info [supervisor-info]
@@ -213,26 +214,10 @@
(convert-to-symbol-from-status (.get_prev_status storm-base))
(map-val clojurify-debugoptions (.get_component_debug storm-base)))))
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-stats [stats]
- (if stats
- (map-val thriftify-executor-stats
- (map-key #(ExecutorInfo. (int (first %1)) (int (last %1)))
- stats))
- {}))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn clojurify-stats [stats]
- (if stats
- (map-val clojurify-executor-stats
- (map-key (fn [x] (list (.get_task_start x) (.get_task_end x)))
- stats))
- {}))
-
(defn clojurify-zk-worker-hb [^ClusterWorkerHeartbeat worker-hb]
(if worker-hb
{:storm-id (.get_storm_id worker-hb)
- :executor-stats (clojurify-stats (into {} (.get_executor_stats worker-hb)))
+ :executor-stats (clojurify-structure (StatsUtil/clojurifyStats (into {} (.get_executor_stats worker-hb))))
:uptime (.get_uptime_secs worker-hb)
:time-secs (.get_time_secs worker-hb)
}
@@ -243,7 +228,7 @@
(doto (ClusterWorkerHeartbeat.)
(.set_uptime_secs (:uptime worker-hb))
(.set_storm_id (:storm-id worker-hb))
- (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb))))
+ (.set_executor_stats (StatsUtil/thriftifyStats (filter second (:executor-stats worker-hb))))
(.set_time_secs (:time-secs worker-hb)))))
(defn clojurify-error [^ErrorInfo error]
http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
index 14d0132..caa3b71 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/builtin_metrics.clj
@@ -16,8 +16,7 @@
(ns org.apache.storm.daemon.builtin-metrics
(:import [org.apache.storm.metric.api CountMetric StateMetric IMetric IStatefulObject])
(:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
- (:import [org.apache.storm Config])
- (:use [org.apache.storm.stats]))
+ (:import [org.apache.storm Config]))
(defrecord BuiltinSpoutMetrics [^MultiCountStatAndMetric ack-count
^MultiLatencyStatAndMetric complete-latency
@@ -38,18 +37,18 @@
(defn make-data [executor-type stats]
(condp = executor-type
- :spout (BuiltinSpoutMetrics. (stats-acked stats)
- (stats-complete-latencies stats)
- (stats-failed stats)
- (stats-emitted stats)
- (stats-transferred stats))
- :bolt (BuiltinBoltMetrics. (stats-acked stats)
- (stats-process-latencies stats)
- (stats-failed stats)
- (stats-executed stats)
- (stats-execute-latencies stats)
- (stats-emitted stats)
- (stats-transferred stats))))
+ :spout (BuiltinSpoutMetrics. (.getAcked stats)
+ (.getCompleteLatencies stats)
+ (.getFailed stats)
+ (.getEmitted stats)
+ (.getTransferred stats))
+ :bolt (BuiltinBoltMetrics. (.getAcked stats)
+ (.getProcessLatencies stats)
+ (.getFailed stats)
+ (.getExecuted stats)
+ (.getExecuteLatencies stats)
+ (.getEmitted stats)
+ (.getTransferred stats))))
(defn make-spout-throttling-data []
(SpoutThrottlingMetrics. (CountMetric.)
@@ -89,10 +88,10 @@
(int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
(defn skipped-max-spout! [^SpoutThrottlingMetrics m stats]
- (-> m .skipped-max-spout (.incrBy (stats-rate stats))))
+ (-> m .skipped-max-spout (.incrBy (.getRate stats))))
(defn skipped-throttle! [^SpoutThrottlingMetrics m stats]
- (-> m .skipped-throttle (.incrBy (stats-rate stats))))
+ (-> m .skipped-throttle (.incrBy (.getRate stats))))
(defn skipped-inactive! [^SpoutThrottlingMetrics m stats]
- (-> m .skipped-inactive (.incrBy (stats-rate stats))))
+ (-> m .skipped-inactive (.incrBy (.getRate stats))))
http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 92cc003..bca03df 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -16,8 +16,9 @@
(ns org.apache.storm.daemon.executor
(:use [org.apache.storm.daemon common])
(:import [org.apache.storm.generated Grouping Grouping$_Fields]
- [java.io Serializable])
- (:use [org.apache.storm util config log stats])
+ [java.io Serializable]
+ [org.apache.storm.stats StatsUtil])
+ (:use [org.apache.storm util config log])
(:import [java.util List Random HashMap ArrayList LinkedList Map])
(:import [org.apache.storm ICredentialsListener Thrift])
(:import [org.apache.storm.hooks ITaskHook])
@@ -41,7 +42,7 @@
[org.json.simple JSONValue]
[com.lmax.disruptor.dsl ProducerType]
[org.apache.storm StormTimer])
- (:require [org.apache.storm [cluster :as cluster] [stats :as stats]])
+ (:require [org.apache.storm [cluster :as cluster]])
(:require [org.apache.storm.daemon [task :as task]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
(:require [clojure.set :as set]))
@@ -407,7 +408,7 @@
(reify
RunningExecutor
(render-stats [this]
- (stats/render-stats! (:stats executor-data)))
+ (clojurify-structure (StatsUtil/renderStats (:stats executor-data))))
(get-executor-id [this]
executor-id)
(credentials-changed [this creds]
@@ -447,7 +448,7 @@
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+ (StatsUtil/spoutFailedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id]
(let [storm-conf (:storm-conf executor-data)
@@ -458,7 +459,7 @@
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+ (StatsUtil/spoutAckedTuple (:stats executor-data) (:stream tuple-info) time-delta))))
(defn mk-task-receiver [executor-data tuple-action-fn]
(let [task-ids (:task-ids executor-data)
@@ -739,7 +740,7 @@
(task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
(when delta
- (stats/bolt-execute-tuple! executor-stats
+ (StatsUtil/boltExecuteTuple executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)))))))
@@ -812,7 +813,7 @@
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
- (stats/bolt-acked-tuple! executor-stats
+ (StatsUtil/boltAckedTuple executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
@@ -827,7 +828,7 @@
(log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
- (stats/bolt-failed-tuple! executor-stats
+ (StatsUtil/boltFailedTuple executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
@@ -862,7 +863,7 @@
;; TODO: refactor this to be part of an executor-specific map
(defmethod mk-executor-stats :spout [_ rate]
- (stats/mk-spout-stats rate))
+ (StatsUtil/mkSpoutStats rate))
(defmethod mk-executor-stats :bolt [_ rate]
- (stats/mk-bolt-stats rate))
+ (StatsUtil/mkBoltStats rate))
http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index 28a6fb8..992a864 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -14,7 +14,8 @@
;; See the License for the specific language governing permissions and
;; limitations under the License.
(ns org.apache.storm.daemon.nimbus
- (:import [org.apache.thrift.server THsHaServer THsHaServer$Args])
+ (:import [org.apache.thrift.server THsHaServer THsHaServer$Args]
+ [org.apache.storm.stats StatsUtil])
(:import [org.apache.storm.generated KeyNotFoundException])
(:import [org.apache.storm.blobstore LocalFsBlobStore])
(:import [org.apache.thrift.protocol TBinaryProtocol TBinaryProtocol$Factory])
@@ -52,8 +53,7 @@
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
(:use [org.apache.storm util config log zookeeper])
(:require [org.apache.storm [cluster :as cluster]
- [converter :as converter]
- [stats :as stats]])
+ [converter :as converter]])
(:require [clojure.set :as set])
(:import [org.apache.storm.daemon.common StormBase Assignment])
(:import [org.apache.storm.zookeeper Zookeeper])
@@ -1668,7 +1668,7 @@
executor->host+port (map-val (fn [[node port]]
[(node->host node) port])
executor->node+port)
- nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id)
+ nodeinfos (clojurify-structure (StatsUtil/extractNodeInfosFromHbForComp executor->host+port task->component false component_id))
all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true)
latest-profile-actions (remove nil? (map (fn [nodeInfo]
(->> all-pending-actions-for-topology
@@ -1912,7 +1912,7 @@
heartbeat (get beats executor)
stats (:stats heartbeat)
stats (if stats
- (stats/thriftify-executor-stats stats))]
+ (StatsUtil/thriftifyExecutorStats stats))]
(doto
(ExecutorSummary. (thriftify-executor-id executor)
(-> executor first task->component)
@@ -2106,14 +2106,14 @@
last-err-fn (partial get-last-error
(:storm-cluster-state info)
topo-id)
- topo-page-info (stats/agg-topo-execs-stats topo-id
+ ;;TODO: add last-error-fn to aggTopoExecsStats method
+ topo-page-info (StatsUtil/aggTopoExecsStats topo-id
exec->node+port
(:task->component info)
(:beats info)
(:topology info)
window
- include-sys?
- last-err-fn)]
+ include-sys?)]
(when-let [owner (:owner (:base info))]
(.set_owner topo-page-info owner))
(when-let [sched-status (.get @(:id->sched-status nimbus) topo-id)]
@@ -2154,7 +2154,7 @@
executor->host+port (map-val (fn [[node port]]
[(node->host node) port])
executor->node+port)
- comp-page-info (stats/agg-comp-execs-stats executor->host+port
+ comp-page-info (StatsUtil/aggCompExecsStats executor->host+port
(:task->component info)
(:beats info)
window
http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 77abdec..c9f6828 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -26,10 +26,9 @@
(:import [org.apache.storm.utils Utils ConfigUtils])
(:import [org.apache.storm.generated ShellComponent JavaObject])
(:import [org.apache.storm.spout ShellSpout])
+ (:import [org.apache.storm.stats StatsUtil])
(:import [java.util Collection List ArrayList])
(:import [org.apache.storm Thrift])
- (:require [org.apache.storm
- [stats :as stats]])
(:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]))
(defn mk-topology-context-builder [worker executor-data topology]
@@ -141,9 +140,9 @@
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
(when (emit-sampler)
- (stats/emitted-tuple! executor-stats stream)
+ (StatsUtil/emittedTuple executor-stats stream)
(if out-task-id
- (stats/transferred-tuples! executor-stats stream 1)))
+ (StatsUtil/transferredTuples executor-stats stream, 1)))
(if out-task-id [out-task-id])
))
([^String stream ^List values]
@@ -163,8 +162,8 @@
)))
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(when (emit-sampler)
- (stats/emitted-tuple! executor-stats stream)
- (stats/transferred-tuples! executor-stats stream (count out-tasks)))
+ (StatsUtil/emittedTuple executor-stats stream)
+ (StatsUtil/transferredTuples executor-stats stream (count out-tasks)))
out-tasks)))
))
http://git-wip-us.apache.org/repos/asf/storm/blob/afd2d525/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj
deleted file mode 100644
index 8b37fc3..0000000
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ /dev/null
@@ -1,1567 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements. See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership. The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License. You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.stats
- (:import [org.apache.storm.generated Nimbus Nimbus$Processor Nimbus$Iface StormTopology ShellComponent
- NotAliveException AlreadyAliveException InvalidTopologyException GlobalStreamId
- ClusterSummary TopologyInfo TopologySummary ExecutorInfo ExecutorSummary ExecutorStats
- ExecutorSpecificStats SpoutStats BoltStats ErrorInfo
- SupervisorSummary CommonAggregateStats ComponentAggregateStats
- ComponentPageInfo ComponentType BoltAggregateStats
- ExecutorAggregateStats SpecificAggregateStats
- SpoutAggregateStats TopologyPageInfo TopologyStats])
- (:import [org.apache.storm.utils Utils])
- (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]
- [java.util Collection])
- (:use [org.apache.storm log util])
- (:use [clojure.math.numeric-tower :only [ceil]]))
-
-(def TEN-MIN-IN-SECONDS (* 10 60))
-
-(def COMMON-FIELDS [:emitted :transferred])
-(defrecord CommonStats [^MultiCountStatAndMetric emitted
- ^MultiCountStatAndMetric transferred
- rate])
-
-(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
-;;acked and failed count individual tuples
-(defrecord BoltExecutorStats [^CommonStats common
- ^MultiCountStatAndMetric acked
- ^MultiCountStatAndMetric failed
- ^MultiLatencyStatAndMetric process-latencies
- ^MultiCountStatAndMetric executed
- ^MultiLatencyStatAndMetric execute-latencies])
-
-(def SPOUT-FIELDS [:acked :failed :complete-latencies])
-;;acked and failed count tuple completion
-(defrecord SpoutExecutorStats [^CommonStats common
- ^MultiCountStatAndMetric acked
- ^MultiCountStatAndMetric failed
- ^MultiLatencyStatAndMetric complete-latencies])
-
-(def NUM-STAT-BUCKETS 20)
-
-(defn- div
- "Perform floating point division on the arguments."
- [f & rest]
- (apply / (double f) rest))
-
-(defn- mk-common-stats
- [rate]
- (CommonStats.
- (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
- (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
- rate))
-
-(defn mk-bolt-stats
- [rate]
- (BoltExecutorStats.
- (mk-common-stats rate)
- (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
- (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
- (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)
- (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
- (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
-
-(defn mk-spout-stats
- [rate]
- (SpoutExecutorStats.
- (mk-common-stats rate)
- (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
- (MultiCountStatAndMetric. NUM-STAT-BUCKETS)
- (MultiLatencyStatAndMetric. NUM-STAT-BUCKETS)))
-
-(defmacro stats-rate
- [stats]
- `(-> ~stats :common :rate))
-
-(defmacro stats-emitted
- [stats]
- `(-> ~stats :common :emitted))
-
-(defmacro stats-transferred
- [stats]
- `(-> ~stats :common :transferred))
-
-(defmacro stats-executed
- [stats]
- `(:executed ~stats))
-
-(defmacro stats-acked
- [stats]
- `(:acked ~stats))
-
-(defmacro stats-failed
- [stats]
- `(:failed ~stats))
-
-(defmacro stats-execute-latencies
- [stats]
- `(:execute-latencies ~stats))
-
-(defmacro stats-process-latencies
- [stats]
- `(:process-latencies ~stats))
-
-(defmacro stats-complete-latencies
- [stats]
- `(:complete-latencies ~stats))
-
-(defn emitted-tuple!
- [stats stream]
- (.incBy ^MultiCountStatAndMetric (stats-emitted stats) ^Object stream ^long (stats-rate stats)))
-
-(defn transferred-tuples!
- [stats stream amt]
- (.incBy ^MultiCountStatAndMetric (stats-transferred stats) ^Object stream ^long (* (stats-rate stats) amt)))
-
-(defn bolt-execute-tuple!
- [^BoltExecutorStats stats component stream latency-ms]
- (let [key [component stream]
- ^MultiCountStatAndMetric executed (stats-executed stats)
- ^MultiLatencyStatAndMetric exec-lat (stats-execute-latencies stats)]
- (.incBy executed key (stats-rate stats))
- (.record exec-lat key latency-ms)))
-
-(defn bolt-acked-tuple!
- [^BoltExecutorStats stats component stream latency-ms]
- (let [key [component stream]
- ^MultiCountStatAndMetric acked (stats-acked stats)
- ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
- (.incBy acked key (stats-rate stats))
- (.record process-lat key latency-ms)))
-
-(defn bolt-failed-tuple!
- [^BoltExecutorStats stats component stream latency-ms]
- (let [key [component stream]
- ^MultiCountStatAndMetric failed (stats-failed stats)]
- (.incBy failed key (stats-rate stats))))
-
-(defn spout-acked-tuple!
- [^SpoutExecutorStats stats stream latency-ms]
- (.incBy ^MultiCountStatAndMetric (stats-acked stats) stream (stats-rate stats))
- (.record ^MultiLatencyStatAndMetric (stats-complete-latencies stats) stream latency-ms))
-
-(defn spout-failed-tuple!
- [^SpoutExecutorStats stats stream latency-ms]
- (.incBy ^MultiCountStatAndMetric (stats-failed stats) stream (stats-rate stats)))
-
-(defn- cleanup-stat! [stat]
- (.close stat))
-
-(defn- cleanup-common-stats!
- [^CommonStats stats]
- (doseq [f COMMON-FIELDS]
- (cleanup-stat! (f stats))))
-
-(defn cleanup-bolt-stats!
- [^BoltExecutorStats stats]
- (cleanup-common-stats! (:common stats))
- (doseq [f BOLT-FIELDS]
- (cleanup-stat! (f stats))))
-
-(defn cleanup-spout-stats!
- [^SpoutExecutorStats stats]
- (cleanup-common-stats! (:common stats))
- (doseq [f SPOUT-FIELDS]
- (cleanup-stat! (f stats))))
-
-(defn- value-stats
- [stats fields]
- (into {} (dofor [f fields]
- [f (if (instance? MultiCountStatAndMetric (f stats))
- (.getTimeCounts ^MultiCountStatAndMetric (f stats))
- (.getTimeLatAvg ^MultiLatencyStatAndMetric (f stats)))])))
-
-(defn- value-common-stats
- [^CommonStats stats]
- (merge
- (value-stats stats COMMON-FIELDS)
- {:rate (:rate stats)}))
-
-(defn value-bolt-stats!
- [^BoltExecutorStats stats]
- (cleanup-bolt-stats! stats)
- (merge (value-common-stats (:common stats))
- (value-stats stats BOLT-FIELDS)
- {:type :bolt}))
-
-(defn value-spout-stats!
- [^SpoutExecutorStats stats]
- (cleanup-spout-stats! stats)
- (merge (value-common-stats (:common stats))
- (value-stats stats SPOUT-FIELDS)
- {:type :spout}))
-
-(defn- class-selector
- [obj & args]
- (class obj))
-
-(defmulti render-stats! class-selector)
-
-(defmethod render-stats! SpoutExecutorStats
- [stats]
- (value-spout-stats! stats))
-
-(defmethod render-stats! BoltExecutorStats
- [stats]
- (value-bolt-stats! stats))
-
-(defmulti thriftify-specific-stats :type)
-(defmulti clojurify-specific-stats class-selector)
-
-(defn window-set-converter
- ([stats key-fn first-key-fun]
- (into {}
- (for [[k v] stats]
- ;apply the first-key-fun only to first key.
- [(first-key-fun k)
- (into {} (for [[k2 v2] v]
- [(key-fn k2) v2]))])))
- ([stats first-key-fun]
- (window-set-converter stats identity first-key-fun)))
-
-(defn to-global-stream-id
- [[component stream]]
- (GlobalStreamId. component stream))
-
-(defn from-global-stream-id [global-stream-id]
- [(.get_componentId global-stream-id) (.get_streamId global-stream-id)])
-
-(defmethod clojurify-specific-stats BoltStats [^BoltStats stats]
- [(window-set-converter (.get_acked stats) from-global-stream-id identity)
- (window-set-converter (.get_failed stats) from-global-stream-id identity)
- (window-set-converter (.get_process_ms_avg stats) from-global-stream-id identity)
- (window-set-converter (.get_executed stats) from-global-stream-id identity)
- (window-set-converter (.get_execute_ms_avg stats) from-global-stream-id identity)])
-
-(defmethod clojurify-specific-stats SpoutStats [^SpoutStats stats]
- [(.get_acked stats)
- (.get_failed stats)
- (.get_complete_ms_avg stats)])
-
-
-(defn clojurify-executor-stats
- [^ExecutorStats stats]
- (let [ specific-stats (.get_specific stats)
- is_bolt? (.is_set_bolt specific-stats)
- specific-stats (if is_bolt? (.get_bolt specific-stats) (.get_spout specific-stats))
- specific-stats (clojurify-specific-stats specific-stats)
- common-stats (CommonStats. (.get_emitted stats)
- (.get_transferred stats)
- (.get_rate stats))]
- (if is_bolt?
- ; worker heart beat does not store the BoltExecutorStats or SpoutExecutorStats , instead it stores the result returned by render-stats!
- ; which flattens the BoltExecutorStats/SpoutExecutorStats by extracting values from all atoms and merging all values inside :common to top
- ;level map we are pretty much doing the same here.
- (dissoc (merge common-stats {:type :bolt} (apply ->BoltExecutorStats (into [nil] specific-stats))) :common)
- (dissoc (merge common-stats {:type :spout} (apply ->SpoutExecutorStats (into [nil] specific-stats))) :common)
- )))
-
-(defmethod thriftify-specific-stats :bolt
- [stats]
- (ExecutorSpecificStats/bolt
- (BoltStats.
- (window-set-converter (:acked stats) to-global-stream-id str)
- (window-set-converter (:failed stats) to-global-stream-id str)
- (window-set-converter (:process-latencies stats) to-global-stream-id str)
- (window-set-converter (:executed stats) to-global-stream-id str)
- (window-set-converter (:execute-latencies stats) to-global-stream-id str))))
-
-(defmethod thriftify-specific-stats :spout
- [stats]
- (ExecutorSpecificStats/spout
- (SpoutStats. (window-set-converter (:acked stats) str)
- (window-set-converter (:failed stats) str)
- (window-set-converter (:complete-latencies stats) str))))
-
-(defn thriftify-executor-stats
- [stats]
- (let [specific-stats (thriftify-specific-stats stats)
- rate (:rate stats)]
- (ExecutorStats. (window-set-converter (:emitted stats) str)
- (window-set-converter (:transferred stats) str)
- specific-stats
- rate)))
-
-(defn valid-number?
- "Returns true if x is a number that is not NaN or Infinity, false otherwise"
- [x]
- (and (number? x)
- (not (Double/isNaN x))
- (not (Double/isInfinite x))))
-
-(defn apply-default
- [f defaulting-fn & args]
- (apply f (map defaulting-fn args)))
-
-(defn apply-or-0
- [f & args]
- (apply apply-default
- f
- #(if (valid-number? %) % 0)
- args))
-
-(defn sum-or-0
- [& args]
- (apply apply-or-0 + args))
-
-(defn product-or-0
- [& args]
- (apply apply-or-0 * args))
-
-(defn max-or-0
- [& args]
- (apply apply-or-0 max args))
-
-(defn- agg-bolt-lat-and-count
- "Aggregates number executed, process latency, and execute latency across all
- streams."
- [idk->exec-avg idk->proc-avg idk->num-executed]
- (letfn [(weight-avg [[id avg]]
- (let [num-e (get idk->num-executed id)]
- (product-or-0 avg num-e)))]
- {:executeLatencyTotal (reduce + (map weight-avg idk->exec-avg))
- :processLatencyTotal (reduce + (map weight-avg idk->proc-avg))
- :executed (reduce + (vals idk->num-executed))}))
-
-(defn- agg-spout-lat-and-count
- "Aggregates number acked and complete latencies across all streams."
- [sid->comp-avg sid->num-acked]
- (letfn [(weight-avg [[id avg]]
- (product-or-0 avg (get sid->num-acked id)))]
- {:completeLatencyTotal (reduce + (map weight-avg sid->comp-avg))
- :acked (reduce + (vals sid->num-acked))}))
-
-(defn add-pairs
- ([] [0 0])
- ([[a1 a2] [b1 b2]]
- [(+ a1 b1) (+ a2 b2)]))
-
-(defn mk-include-sys-fn
- [include-sys?]
- (if include-sys?
- (fn [_] true)
- (fn [stream] (and (string? stream) (not (Utils/isSystemId stream))))))
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn mk-include-sys-filter
- "Returns a function that includes or excludes map entries whose keys are
- system ids."
- [include-sys?]
- (if include-sys?
- identity
- (partial filter-key (mk-include-sys-fn false))))
-
-(defn- agg-bolt-streams-lat-and-count
- "Aggregates number executed and process & execute latencies."
- [idk->exec-avg idk->proc-avg idk->executed]
- (letfn [(weight-avg [id avg]
- (let [num-e (idk->executed id)]
- (product-or-0 avg num-e)))]
- (into {}
- (for [k (keys idk->exec-avg)]
- [k {:executeLatencyTotal (weight-avg k (get idk->exec-avg k))
- :processLatencyTotal (weight-avg k (get idk->proc-avg k))
- :executed (idk->executed k)}]))))
-
-(defn- agg-spout-streams-lat-and-count
- "Aggregates number acked and complete latencies."
- [idk->comp-avg idk->acked]
- (letfn [(weight-avg [id avg]
- (let [num-e (get idk->acked id)]
- (product-or-0 avg num-e)))]
- (into {}
- (for [k (keys idk->comp-avg)]
- [k {:completeLatencyTotal (weight-avg k (get idk->comp-avg k))
- :acked (get idk->acked k)}]))))
-
-(defn swap-map-order
- "For a nested map, rearrange data such that the top-level keys become the
- nested map's keys and vice versa.
- Example:
- {:a {:X :banana, :Y :pear}, :b {:X :apple, :Y :orange}}
- -> {:Y {:a :pear, :b :orange}, :X {:a :banana, :b :apple}}"
- [m]
- (apply merge-with
- merge
- (map (fn [[k v]]
- (into {}
- (for [[k2 v2] v]
- [k2 {k v2}])))
- m)))
-
-(defn- compute-agg-capacity
- "Computes the capacity metric for one executor given its heartbeat data and
- uptime."
- [m uptime]
- (when uptime
- (->>
- ;; For each stream, create weighted averages and counts.
- (merge-with (fn weighted-avg+count-fn
- [avg cnt]
- [(* avg cnt) cnt])
- (get (:execute-latencies m) (str TEN-MIN-IN-SECONDS))
- (get (:executed m) (str TEN-MIN-IN-SECONDS)))
- vals ;; Ignore the stream ids.
- (reduce add-pairs
- [0. 0]) ;; Combine weighted averages and counts.
- ((fn [[weighted-avg cnt]]
- (div weighted-avg (* 1000 (min uptime TEN-MIN-IN-SECONDS))))))))
-
-(defn agg-pre-merge-comp-page-bolt
- [{exec-id :exec-id
- host :host
- port :port
- uptime :uptime
- comp-id :comp-id
- num-tasks :num-tasks
- statk->w->sid->num :stats}
- window
- include-sys?]
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- (let [str-key (partial map-key str)
- handle-sys-components-fn (mk-include-sys-filter include-sys?)]
- {:executor-id exec-id,
- :host host,
- :port port,
- :uptime uptime,
- :num-executors 1,
- :num-tasks num-tasks,
- :capacity (compute-agg-capacity statk->w->sid->num uptime)
- :cid+sid->input-stats
- (merge-with
- merge
- (swap-map-order
- {:acked (-> statk->w->sid->num
- :acked
- str-key
- (get window))
- :failed (-> statk->w->sid->num
- :failed
- str-key
- (get window))})
- (agg-bolt-streams-lat-and-count (-> statk->w->sid->num
- :execute-latencies
- str-key
- (get window))
- (-> statk->w->sid->num
- :process-latencies
- str-key
- (get window))
- (-> statk->w->sid->num
- :executed
- str-key
- (get window)))),
- :sid->output-stats
- (swap-map-order
- {:emitted (-> statk->w->sid->num
- :emitted
- str-key
- (get window)
- handle-sys-components-fn)
- :transferred (-> statk->w->sid->num
- :transferred
- str-key
- (get window)
- handle-sys-components-fn)})}))
-
-(defn agg-pre-merge-comp-page-spout
- [{exec-id :exec-id
- host :host
- port :port
- uptime :uptime
- comp-id :comp-id
- num-tasks :num-tasks
- statk->w->sid->num :stats}
- window
- include-sys?]
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- (let [str-key (partial map-key str)
- handle-sys-components-fn (mk-include-sys-filter include-sys?)]
- {:executor-id exec-id,
- :host host,
- :port port,
- :uptime uptime,
- :num-executors 1,
- :num-tasks num-tasks,
- :sid->output-stats
- (merge-with
- merge
- (agg-spout-streams-lat-and-count (-> statk->w->sid->num
- :complete-latencies
- str-key
- (get window))
- (-> statk->w->sid->num
- :acked
- str-key
- (get window)))
- (swap-map-order
- {:acked (-> statk->w->sid->num
- :acked
- str-key
- (get window))
- :failed (-> statk->w->sid->num
- :failed
- str-key
- (get window))
- :emitted (-> statk->w->sid->num
- :emitted
- str-key
- (get window)
- handle-sys-components-fn)
- :transferred (-> statk->w->sid->num
- :transferred
- str-key
- (get window)
- handle-sys-components-fn)}))}))
-
-(defn agg-pre-merge-topo-page-bolt
- [{comp-id :comp-id
- num-tasks :num-tasks
- statk->w->sid->num :stats
- uptime :uptime}
- window
- include-sys?]
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- (let [str-key (partial map-key str)
- handle-sys-components-fn (mk-include-sys-filter include-sys?)]
- {comp-id
- (merge
- (agg-bolt-lat-and-count (-> statk->w->sid->num
- :execute-latencies
- str-key
- (get window))
- (-> statk->w->sid->num
- :process-latencies
- str-key
- (get window))
- (-> statk->w->sid->num
- :executed
- str-key
- (get window)))
- {:num-executors 1
- :num-tasks num-tasks
- :emitted (-> statk->w->sid->num
- :emitted
- str-key
- (get window)
- handle-sys-components-fn
- vals
- (#(reduce + %)))
- :transferred (-> statk->w->sid->num
- :transferred
- str-key
- (get window)
- handle-sys-components-fn
- vals
- (#(reduce + %)))
- :capacity (compute-agg-capacity statk->w->sid->num uptime)
- :acked (-> statk->w->sid->num
- :acked
- str-key
- (get window)
- vals
- (#(reduce + %)))
- :failed (-> statk->w->sid->num
- :failed
- str-key
- (get window)
- vals
- (#(reduce + %)))})}))
-
-(defn agg-pre-merge-topo-page-spout
- [{comp-id :comp-id
- num-tasks :num-tasks
- statk->w->sid->num :stats}
- window
- include-sys?]
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- (let [str-key (partial map-key str)
- handle-sys-components-fn (mk-include-sys-filter include-sys?)]
- {comp-id
- (merge
- (agg-spout-lat-and-count (-> statk->w->sid->num
- :complete-latencies
- str-key
- (get window))
- (-> statk->w->sid->num
- :acked
- str-key
- (get window)))
- {:num-executors 1
- :num-tasks num-tasks
- :emitted (-> statk->w->sid->num
- :emitted
- str-key
- (get window)
- handle-sys-components-fn
- vals
- (#(reduce + %)))
- :transferred (-> statk->w->sid->num
- :transferred
- str-key
- (get window)
- handle-sys-components-fn
- vals
- (#(reduce + %)))
- :failed (-> statk->w->sid->num
- :failed
- str-key
- (get window)
- vals
- (#(reduce + %)))})}))
-
-(defn merge-agg-comp-stats-comp-page-bolt
- [{acc-in :cid+sid->input-stats
- acc-out :sid->output-stats
- :as acc-bolt-stats}
- {bolt-in :cid+sid->input-stats
- bolt-out :sid->output-stats
- :as bolt-stats}]
- {:num-executors (inc (or (:num-executors acc-bolt-stats) 0)),
- :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats)),
- :sid->output-stats (merge-with (partial merge-with sum-or-0)
- acc-out
- bolt-out),
- :cid+sid->input-stats (merge-with (partial merge-with sum-or-0)
- acc-in
- bolt-in),
- :executor-stats
- (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
- executed (sum-streams bolt-in :executed)]
- (conj (:executor-stats acc-bolt-stats)
- (merge
- (select-keys bolt-stats
- [:executor-id :uptime :host :port :capacity])
- {:emitted (sum-streams bolt-out :emitted)
- :transferred (sum-streams bolt-out :transferred)
- :acked (sum-streams bolt-in :acked)
- :failed (sum-streams bolt-in :failed)
- :executed executed}
- (->>
- (if (and executed (pos? executed))
- [(div (sum-streams bolt-in :executeLatencyTotal) executed)
- (div (sum-streams bolt-in :processLatencyTotal) executed)]
- [nil nil])
- (mapcat vector [:execute-latency :process-latency])
- (apply assoc {})))))})
-
-(defn merge-agg-comp-stats-comp-page-spout
- [{acc-out :sid->output-stats
- :as acc-spout-stats}
- {spout-out :sid->output-stats
- :as spout-stats}]
- {:num-executors (inc (or (:num-executors acc-spout-stats) 0)),
- :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats)),
- :sid->output-stats (merge-with (partial merge-with sum-or-0)
- acc-out
- spout-out),
- :executor-stats
- (let [sum-streams (fn [m k] (->> m vals (map k) (apply sum-or-0)))
- acked (sum-streams spout-out :acked)]
- (conj (:executor-stats acc-spout-stats)
- (merge
- (select-keys spout-stats [:executor-id :uptime :host :port])
- {:emitted (sum-streams spout-out :emitted)
- :transferred (sum-streams spout-out :transferred)
- :acked acked
- :failed (sum-streams spout-out :failed)}
- {:complete-latency (if (and acked (pos? acked))
- (div (sum-streams spout-out
- :completeLatencyTotal)
- acked)
- nil)})))})
-
-(defn merge-agg-comp-stats-topo-page-bolt
- [acc-bolt-stats bolt-stats]
- {:num-executors (inc (or (:num-executors acc-bolt-stats) 0))
- :num-tasks (sum-or-0 (:num-tasks acc-bolt-stats) (:num-tasks bolt-stats))
- :emitted (sum-or-0 (:emitted acc-bolt-stats) (:emitted bolt-stats))
- :transferred (sum-or-0 (:transferred acc-bolt-stats)
- (:transferred bolt-stats))
- :capacity (max-or-0 (:capacity acc-bolt-stats) (:capacity bolt-stats))
- ;; We sum average latency totals here to avoid dividing at each step.
- ;; Compute the average latencies by dividing the total by the count.
- :executeLatencyTotal (sum-or-0 (:executeLatencyTotal acc-bolt-stats)
- (:executeLatencyTotal bolt-stats))
- :processLatencyTotal (sum-or-0 (:processLatencyTotal acc-bolt-stats)
- (:processLatencyTotal bolt-stats))
- :executed (sum-or-0 (:executed acc-bolt-stats) (:executed bolt-stats))
- :acked (sum-or-0 (:acked acc-bolt-stats) (:acked bolt-stats))
- :failed (sum-or-0 (:failed acc-bolt-stats) (:failed bolt-stats))})
-
-(defn merge-agg-comp-stats-topo-page-spout
- [acc-spout-stats spout-stats]
- {:num-executors (inc (or (:num-executors acc-spout-stats) 0))
- :num-tasks (sum-or-0 (:num-tasks acc-spout-stats) (:num-tasks spout-stats))
- :emitted (sum-or-0 (:emitted acc-spout-stats) (:emitted spout-stats))
- :transferred (sum-or-0 (:transferred acc-spout-stats) (:transferred spout-stats))
- ;; We sum average latency totals here to avoid dividing at each step.
- ;; Compute the average latencies by dividing the total by the count.
- :completeLatencyTotal (sum-or-0 (:completeLatencyTotal acc-spout-stats)
- (:completeLatencyTotal spout-stats))
- :acked (sum-or-0 (:acked acc-spout-stats) (:acked spout-stats))
- :failed (sum-or-0 (:failed acc-spout-stats) (:failed spout-stats))})
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn aggregate-count-streams
- [stats]
- (->> stats
- (map-val #(reduce + (vals %)))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- agg-topo-exec-stats*
- "A helper function that does the common work to aggregate stats of one
- executor with the given map for the topology page."
- [window
- include-sys?
- {:keys [workers-set
- bolt-id->stats
- spout-id->stats
- window->emitted
- window->transferred
- window->comp-lat-wgt-avg
- window->acked
- window->failed] :as acc-stats}
- {:keys [stats] :as new-data}
- pre-merge-fn
- merge-fn
- comp-key]
- (let [cid->statk->num (pre-merge-fn new-data window include-sys?)
- {w->compLatWgtAvg :completeLatencyTotal
- w->acked :acked}
- (if (:complete-latencies stats)
- (swap-map-order
- (into {}
- (for [w (keys (:acked stats))]
- [w (agg-spout-lat-and-count
- (get (:complete-latencies stats) w)
- (get (:acked stats) w))])))
- {:completeLatencyTotal nil
- :acks (aggregate-count-streams (:acked stats))})
- handle-sys-components-fn (mk-include-sys-filter include-sys?)]
- (assoc {:workers-set (conj workers-set
- [(:host new-data) (:port new-data)])
- :bolt-id->stats bolt-id->stats
- :spout-id->stats spout-id->stats
- :window->emitted (->> (:emitted stats)
- (map-val handle-sys-components-fn)
- aggregate-count-streams
- (merge-with + window->emitted))
- :window->transferred (->> (:transferred stats)
- (map-val handle-sys-components-fn)
- aggregate-count-streams
- (merge-with + window->transferred))
- :window->comp-lat-wgt-avg (merge-with +
- window->comp-lat-wgt-avg
- w->compLatWgtAvg)
- :window->acked (if (= :spout (:type stats))
- (merge-with + window->acked w->acked)
- window->acked)
- :window->failed (if (= :spout (:type stats))
- (->> (:failed stats)
- aggregate-count-streams
- (merge-with + window->failed))
- window->failed)}
- comp-key (merge-with merge-fn
- (acc-stats comp-key)
- cid->statk->num)
- :type (:type stats))))
-
-(defmulti agg-topo-exec-stats
- "Combines the aggregate stats of one executor with the given map, selecting
- the appropriate window and including system components as specified."
- (fn dispatch-fn [& args] (:type (last args))))
-
-(defmethod agg-topo-exec-stats :bolt
- [window include-sys? acc-stats new-data]
- (agg-topo-exec-stats* window
- include-sys?
- acc-stats
- new-data
- agg-pre-merge-topo-page-bolt
- merge-agg-comp-stats-topo-page-bolt
- :bolt-id->stats))
-
-(defmethod agg-topo-exec-stats :spout
- [window include-sys? acc-stats new-data]
- (agg-topo-exec-stats* window
- include-sys?
- acc-stats
- new-data
- agg-pre-merge-topo-page-spout
- merge-agg-comp-stats-topo-page-spout
- :spout-id->stats))
-
-(defmethod agg-topo-exec-stats :default [_ _ acc-stats _] acc-stats)
-
-(defn get-last-error
- [storm-cluster-state storm-id component-id]
- (if-let [e (.last-error storm-cluster-state storm-id component-id)]
- (ErrorInfo. (:error e) (:time-secs e))))
-
-(defn component-type
- "Returns the component type (either :bolt or :spout) for a given
- topology and component id. Returns nil if not found."
- [^StormTopology topology id]
- (let [bolts (.get_bolts topology)
- spouts (.get_spouts topology)]
- (cond
- (Utils/isSystemId id) :bolt
- (.containsKey bolts id) :bolt
- (.containsKey spouts id) :spout)))
-
-(defn extract-nodeinfos-from-hb-for-comp
- ([exec->host+port task->component include-sys? comp-id]
- (distinct (for [[[start end :as executor] [host port]] exec->host+port
- :let [id (task->component start)]
- :when (and (or (nil? comp-id) (= comp-id id))
- (or include-sys? (not (Utils/isSystemId id))))]
- {:host host
- :port port}))))
-
-(defn extract-data-from-hb
- ([exec->host+port task->component beats include-sys? topology comp-id]
- (for [[[start end :as executor] [host port]] exec->host+port
- :let [beat (beats executor)
- id (task->component start)]
- :when (and (or (nil? comp-id) (= comp-id id))
- (or include-sys? (not (Utils/isSystemId id))))]
- {:exec-id executor
- :comp-id id
- :num-tasks (count (range start (inc end)))
- :host host
- :port port
- :uptime (:uptime beat)
- :stats (:stats beat)
- :type (or (:type (:stats beat))
- (component-type topology id))}))
- ([exec->host+port task->component beats include-sys? topology]
- (extract-data-from-hb exec->host+port
- task->component
- beats
- include-sys?
- topology
- nil)))
-
-(defn aggregate-topo-stats
- [window include-sys? data]
- (let [init-val {:workers-set #{}
- :bolt-id->stats {}
- :spout-id->stats {}
- :window->emitted {}
- :window->transferred {}
- :window->comp-lat-wgt-avg {}
- :window->acked {}
- :window->failed {}}
- reducer-fn (partial agg-topo-exec-stats
- window
- include-sys?)]
- (reduce reducer-fn init-val data)))
-
-(defn- compute-weighted-averages-per-window
- [acc-data wgt-avg-key divisor-key]
- (into {} (for [[window wgt-avg] (wgt-avg-key acc-data)
- :let [divisor ((divisor-key acc-data) window)]
- :when (and divisor (pos? divisor))]
- [(str window) (div wgt-avg divisor)])))
-
-(defn- post-aggregate-topo-stats
- [task->component exec->node+port last-err-fn acc-data]
- {:num-tasks (count task->component)
- :num-workers (count (:workers-set acc-data))
- :num-executors (count exec->node+port)
- :bolt-id->stats
- (into {} (for [[id m] (:bolt-id->stats acc-data)
- :let [executed (:executed m)]]
- [id (-> m
- (assoc :execute-latency
- (if (and executed (pos? executed))
- (div (or (:executeLatencyTotal m) 0)
- executed)
- 0)
- :process-latency
- (if (and executed (pos? executed))
- (div (or (:processLatencyTotal m) 0)
- executed)
- 0))
- (dissoc :executeLatencyTotal
- :processLatencyTotal)
- (assoc :lastError (last-err-fn id)))]))
- :spout-id->stats
- (into {} (for [[id m] (:spout-id->stats acc-data)
- :let [acked (:acked m)]]
- [id (-> m
- (assoc :complete-latency
- (if (and acked (pos? acked))
- (div (:completeLatencyTotal m)
- (:acked m))
- 0))
- (dissoc :completeLatencyTotal)
- (assoc :lastError (last-err-fn id)))]))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->emitted (map-key str (:window->emitted acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->transferred (map-key str (:window->transferred acc-data))
- :window->complete-latency
- (compute-weighted-averages-per-window acc-data
- :window->comp-lat-wgt-avg
- :window->acked)
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->acked (map-key str (:window->acked acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->failed (map-key str (:window->failed acc-data))})
-
-(defn- thriftify-common-agg-stats
- [^ComponentAggregateStats s
- {:keys [num-tasks
- emitted
- transferred
- acked
- failed
- num-executors] :as statk->num}]
- (let [cas (CommonAggregateStats.)]
- (and num-executors (.set_num_executors cas num-executors))
- (and num-tasks (.set_num_tasks cas num-tasks))
- (and emitted (.set_emitted cas emitted))
- (and transferred (.set_transferred cas transferred))
- (and acked (.set_acked cas acked))
- (and failed (.set_failed cas failed))
- (.set_common_stats s cas)))
-
-(defn thriftify-bolt-agg-stats
- [statk->num]
- (let [{:keys [lastError
- execute-latency
- process-latency
- executed
- capacity]} statk->num
- s (ComponentAggregateStats.)]
- (.set_type s ComponentType/BOLT)
- (and lastError (.set_last_error s lastError))
- (thriftify-common-agg-stats s statk->num)
- (.set_specific_stats s
- (SpecificAggregateStats/bolt
- (let [bas (BoltAggregateStats.)]
- (and execute-latency (.set_execute_latency_ms bas execute-latency))
- (and process-latency (.set_process_latency_ms bas process-latency))
- (and executed (.set_executed bas executed))
- (and capacity (.set_capacity bas capacity))
- bas)))
- s))
-
-(defn thriftify-spout-agg-stats
- [statk->num]
- (let [{:keys [lastError
- complete-latency]} statk->num
- s (ComponentAggregateStats.)]
- (.set_type s ComponentType/SPOUT)
- (and lastError (.set_last_error s lastError))
- (thriftify-common-agg-stats s statk->num)
- (.set_specific_stats s
- (SpecificAggregateStats/spout
- (let [sas (SpoutAggregateStats.)]
- (and complete-latency (.set_complete_latency_ms sas complete-latency))
- sas)))
- s))
-
-(defn thriftify-topo-page-data
- [topology-id data]
- (let [{:keys [num-tasks
- num-workers
- num-executors
- spout-id->stats
- bolt-id->stats
- window->emitted
- window->transferred
- window->complete-latency
- window->acked
- window->failed]} data
- spout-agg-stats (into {}
- (for [[id m] spout-id->stats
- :let [m (assoc m :type :spout)]]
- [id
- (thriftify-spout-agg-stats m)]))
- bolt-agg-stats (into {}
- (for [[id m] bolt-id->stats
- :let [m (assoc m :type :bolt)]]
- [id
- (thriftify-bolt-agg-stats m)]))
- topology-stats (doto (TopologyStats.)
- (.set_window_to_emitted window->emitted)
- (.set_window_to_transferred window->transferred)
- (.set_window_to_complete_latencies_ms
- window->complete-latency)
- (.set_window_to_acked window->acked)
- (.set_window_to_failed window->failed))
- topo-page-info (doto (TopologyPageInfo. topology-id)
- (.set_num_tasks num-tasks)
- (.set_num_workers num-workers)
- (.set_num_executors num-executors)
- (.set_id_to_spout_agg_stats spout-agg-stats)
- (.set_id_to_bolt_agg_stats bolt-agg-stats)
- (.set_topology_stats topology-stats))]
- topo-page-info))
-
-(defn agg-topo-execs-stats
- "Aggregate various executor statistics for a topology from the given
- heartbeats."
- [topology-id
- exec->node+port
- task->component
- beats
- topology
- window
- include-sys?
- last-err-fn]
- (->> ;; This iterates over each executor one time, because of lazy evaluation.
- (extract-data-from-hb exec->node+port
- task->component
- beats
- include-sys?
- topology)
- (aggregate-topo-stats window include-sys?)
- (post-aggregate-topo-stats task->component exec->node+port last-err-fn)
- (thriftify-topo-page-data topology-id)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- agg-bolt-exec-win-stats
- "A helper function that aggregates windowed stats from one bolt executor."
- [acc-stats new-stats include-sys?]
- (let [{w->execLatWgtAvg :executeLatencyTotal
- w->procLatWgtAvg :processLatencyTotal
- w->executed :executed}
- (swap-map-order
- (into {} (for [w (keys (:executed new-stats))]
- [w (agg-bolt-lat-and-count
- (get (:execute-latencies new-stats) w)
- (get (:process-latencies new-stats) w)
- (get (:executed new-stats) w))])))
- handle-sys-components-fn (mk-include-sys-filter include-sys?)]
- {:window->emitted (->> (:emitted new-stats)
- (map-val handle-sys-components-fn)
- aggregate-count-streams
- (merge-with + (:window->emitted acc-stats)))
- :window->transferred (->> (:transferred new-stats)
- (map-val handle-sys-components-fn)
- aggregate-count-streams
- (merge-with + (:window->transferred acc-stats)))
- :window->exec-lat-wgt-avg (merge-with +
- (:window->exec-lat-wgt-avg acc-stats)
- w->execLatWgtAvg)
- :window->proc-lat-wgt-avg (merge-with +
- (:window->proc-lat-wgt-avg acc-stats)
- w->procLatWgtAvg)
- :window->executed (merge-with + (:window->executed acc-stats) w->executed)
- :window->acked (->> (:acked new-stats)
- aggregate-count-streams
- (merge-with + (:window->acked acc-stats)))
- :window->failed (->> (:failed new-stats)
- aggregate-count-streams
- (merge-with + (:window->failed acc-stats)))}))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- agg-spout-exec-win-stats
- "A helper function that aggregates windowed stats from one spout executor."
- [acc-stats new-stats include-sys?]
- (let [{w->compLatWgtAvg :completeLatencyTotal
- w->acked :acked}
- (swap-map-order
- (into {} (for [w (keys (:acked new-stats))]
- [w (agg-spout-lat-and-count
- (get (:complete-latencies new-stats) w)
- (get (:acked new-stats) w))])))
- handle-sys-components-fn (mk-include-sys-filter include-sys?)]
- {:window->emitted (->> (:emitted new-stats)
- (map-val handle-sys-components-fn)
- aggregate-count-streams
- (merge-with + (:window->emitted acc-stats)))
- :window->transferred (->> (:transferred new-stats)
- (map-val handle-sys-components-fn)
- aggregate-count-streams
- (merge-with + (:window->transferred acc-stats)))
- :window->comp-lat-wgt-avg (merge-with +
- (:window->comp-lat-wgt-avg acc-stats)
- w->compLatWgtAvg)
- :window->acked (->> (:acked new-stats)
- aggregate-count-streams
- (merge-with + (:window->acked acc-stats)))
- :window->failed (->> (:failed new-stats)
- aggregate-count-streams
- (merge-with + (:window->failed acc-stats)))}))
-
-(defmulti agg-comp-exec-stats
- "Combines the aggregate stats of one executor with the given map, selecting
- the appropriate window and including system components as specified."
- (fn dispatch-fn [_ _ init-val _] (:type init-val)))
-
-(defmethod agg-comp-exec-stats :bolt
- [window include-sys? acc-stats new-data]
- (assoc (agg-bolt-exec-win-stats acc-stats (:stats new-data) include-sys?)
- :stats (merge-agg-comp-stats-comp-page-bolt
- (:stats acc-stats)
- (agg-pre-merge-comp-page-bolt new-data window include-sys?))
- :type :bolt))
-
-(defmethod agg-comp-exec-stats :spout
- [window include-sys? acc-stats new-data]
- (assoc (agg-spout-exec-win-stats acc-stats (:stats new-data) include-sys?)
- :stats (merge-agg-comp-stats-comp-page-spout
- (:stats acc-stats)
- (agg-pre-merge-comp-page-spout new-data window include-sys?))
- :type :spout))
-
-(defn- aggregate-comp-stats*
- [window include-sys? data init-val]
- (-> (partial agg-comp-exec-stats
- window
- include-sys?)
- (reduce init-val data)))
-
-(defmulti aggregate-comp-stats
- (fn dispatch-fn [& args] (-> args last first :type)))
-
-(defmethod aggregate-comp-stats :bolt
- [& args]
- (let [init-val {:type :bolt
- :cid+sid->input-stats {}
- :sid->output-stats {}
- :executor-stats []
- :window->emitted {}
- :window->transferred {}
- :window->exec-lat-wgt-avg {}
- :window->executed {}
- :window->proc-lat-wgt-avg {}
- :window->acked {}
- :window->failed {}}]
- (apply aggregate-comp-stats* (concat args (list init-val)))))
-
-(defmethod aggregate-comp-stats :spout
- [& args]
- (let [init-val {:type :spout
- :sid->output-stats {}
- :executor-stats []
- :window->emitted {}
- :window->transferred {}
- :window->comp-lat-wgt-avg {}
- :window->acked {}
- :window->failed {}}]
- (apply aggregate-comp-stats* (concat args (list init-val)))))
-
-(defmethod aggregate-comp-stats :default [& _] {})
-
-(defmulti post-aggregate-comp-stats
- (fn [_ _ data] (:type data)))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defmethod post-aggregate-comp-stats :bolt
- [task->component
- exec->host+port
- {{i-stats :cid+sid->input-stats
- o-stats :sid->output-stats
- num-tasks :num-tasks
- num-executors :num-executors} :stats
- comp-type :type :as acc-data}]
- {:type comp-type
- :num-tasks num-tasks
- :num-executors num-executors
- :cid+sid->input-stats
- (->> i-stats
- (map-val (fn [m]
- (let [executed (:executed m)
- lats (if (and executed (pos? executed))
- {:execute-latency
- (div (or (:executeLatencyTotal m) 0)
- executed)
- :process-latency
- (div (or (:processLatencyTotal m) 0)
- executed)}
- {:execute-latency 0
- :process-latency 0})]
- (-> m (merge lats) (dissoc :executeLatencyTotal
- :processLatencyTotal))))))
- :sid->output-stats o-stats
- :executor-stats (:executor-stats (:stats acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->emitted (map-key str (:window->emitted acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->transferred (map-key str (:window->transferred acc-data))
- :window->execute-latency
- (compute-weighted-averages-per-window acc-data
- :window->exec-lat-wgt-avg
- :window->executed)
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->executed (map-key str (:window->executed acc-data))
- :window->process-latency
- (compute-weighted-averages-per-window acc-data
- :window->proc-lat-wgt-avg
- :window->executed)
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->acked (map-key str (:window->acked acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->failed (map-key str (:window->failed acc-data))})
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defmethod post-aggregate-comp-stats :spout
- [task->component
- exec->host+port
- {{o-stats :sid->output-stats
- num-tasks :num-tasks
- num-executors :num-executors} :stats
- comp-type :type :as acc-data}]
- {:type comp-type
- :num-tasks num-tasks
- :num-executors num-executors
- :sid->output-stats
- (->> o-stats
- (map-val (fn [m]
- (let [acked (:acked m)
- lat (if (and acked (pos? acked))
- {:complete-latency
- (div (or (:completeLatencyTotal m) 0) acked)}
- {:complete-latency 0})]
- (-> m (merge lat) (dissoc :completeLatencyTotal))))))
- :executor-stats (:executor-stats (:stats acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->emitted (map-key str (:window->emitted acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->transferred (map-key str (:window->transferred acc-data))
- :window->complete-latency
- (compute-weighted-averages-per-window acc-data
- :window->comp-lat-wgt-avg
- :window->acked)
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->acked (map-key str (:window->acked acc-data))
- ;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
- :window->failed (map-key str (:window->failed acc-data))})
-
-(defmethod post-aggregate-comp-stats :default [& _] {})
-
-(defn thriftify-exec-agg-stats
- [comp-id comp-type {:keys [executor-id host port uptime] :as stats}]
- (doto (ExecutorAggregateStats.)
- (.set_exec_summary (ExecutorSummary. (apply #(ExecutorInfo. %1 %2)
- executor-id)
- comp-id
- host
- port
- (or uptime 0)))
- (.set_stats ((condp = comp-type
- :bolt thriftify-bolt-agg-stats
- :spout thriftify-spout-agg-stats) stats))))
-
-(defn- thriftify-bolt-input-stats
- [cid+sid->input-stats]
- (into {} (for [[cid+sid input-stats] cid+sid->input-stats]
- [(to-global-stream-id cid+sid)
- (thriftify-bolt-agg-stats input-stats)])))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- thriftify-bolt-output-stats
- [sid->output-stats]
- (map-val thriftify-bolt-agg-stats sid->output-stats))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn- thriftify-spout-output-stats
- [sid->output-stats]
- (map-val thriftify-spout-agg-stats sid->output-stats))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn thriftify-comp-page-data
- [topo-id topology comp-id data]
- (let [w->stats (swap-map-order
- (merge
- {:emitted (:window->emitted data)
- :transferred (:window->transferred data)
- :acked (:window->acked data)
- :failed (:window->failed data)}
- (condp = (:type data)
- :bolt {:execute-latency (:window->execute-latency data)
- :process-latency (:window->process-latency data)
- :executed (:window->executed data)}
- :spout {:complete-latency
- (:window->complete-latency data)}
- {}))) ; default
- [compType exec-stats w->stats gsid->input-stats sid->output-stats]
- (condp = (component-type topology comp-id)
- :bolt [ComponentType/BOLT
- (->
- (partial thriftify-exec-agg-stats comp-id :bolt)
- (map (:executor-stats data)))
- (map-val thriftify-bolt-agg-stats w->stats)
- (thriftify-bolt-input-stats (:cid+sid->input-stats data))
- (thriftify-bolt-output-stats (:sid->output-stats data))]
- :spout [ComponentType/SPOUT
- (->
- (partial thriftify-exec-agg-stats comp-id :spout)
- (map (:executor-stats data)))
- (map-val thriftify-spout-agg-stats w->stats)
- nil ;; spouts do not have input stats
- (thriftify-spout-output-stats (:sid->output-stats data))]),
- num-executors (:num-executors data)
- num-tasks (:num-tasks data)
- ret (doto (ComponentPageInfo. comp-id compType)
- (.set_topology_id topo-id)
- (.set_topology_name nil)
- (.set_window_to_stats w->stats)
- (.set_sid_to_output_stats sid->output-stats)
- (.set_exec_stats exec-stats))]
- (and num-executors (.set_num_executors ret num-executors))
- (and num-tasks (.set_num_tasks ret num-tasks))
- (and gsid->input-stats
- (.set_gsid_to_input_stats ret gsid->input-stats))
- ret))
-
-(defn agg-comp-execs-stats
- "Aggregate various executor statistics for a component from the given
- heartbeats."
- [exec->host+port
- task->component
- beats
- window
- include-sys?
- topology-id
- topology
- component-id]
- (->> ;; This iterates over each executor one time, because of lazy evaluation.
- (extract-data-from-hb exec->host+port
- task->component
- beats
- include-sys?
- topology
- component-id)
- (aggregate-comp-stats window include-sys?)
- (post-aggregate-comp-stats task->component exec->host+port)
- (thriftify-comp-page-data topology-id topology component-id)))
-
-(defn expand-averages
- [avg counts]
- (let [avg (clojurify-structure avg)
- counts (clojurify-structure counts)]
- (into {}
- (for [[slice streams] counts]
- [slice
- (into {}
- (for [[stream c] streams]
- [stream
- [(* c (get-in avg [slice stream]))
- c]]
- ))]))))
-
-(defn expand-averages-seq
- [average-seq counts-seq]
- (->> (map vector average-seq counts-seq)
- (map #(apply expand-averages %))
- (apply merge-with (fn [s1 s2] (merge-with add-pairs s1 s2)))))
-
-(defn- val-avg
- [[t c]]
- (if (= c 0) 0
- (double (/ t c))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn aggregate-averages
- [average-seq counts-seq]
- (->> (expand-averages-seq average-seq counts-seq)
- (map-val
- (fn [s]
- (map-val val-avg s)))))
-
-;TODO: when translating this function, you should replace the map-val with a proper for loop HERE
-(defn aggregate-avg-streams
- [avg counts]
- (let [expanded (expand-averages avg counts)]
- (->> expanded
- (map-val #(reduce add-pairs (vals %)))
- (map-val val-avg))))
-
-;TODO: when translating this function, you should replace the filter-val with a proper for loop + if condition HERE
-(defn pre-process
- [stream-summary include-sys?]
- (let [filter-fn (mk-include-sys-fn include-sys?)
- emitted (:emitted stream-summary)
- emitted (into {} (for [[window stat] emitted]
- {window (filter-key filter-fn stat)}))
- transferred (:transferred stream-summary)
- transferred (into {} (for [[window stat] transferred]
- {window (filter-key filter-fn stat)}))
- stream-summary (-> stream-summary (dissoc :emitted) (assoc :emitted emitted))
- stream-summary (-> stream-summary (dissoc :transferred) (assoc :transferred transferred))]
- stream-summary))
-
-(defn aggregate-counts
- [counts-seq]
- (->> counts-seq
- (map clojurify-structure)
- (apply merge-with
- (fn [s1 s2]
- (merge-with + s1 s2)))))
-
-(defn aggregate-common-stats
- [stats-seq]
- {:emitted (aggregate-counts (map #(.get_emitted ^ExecutorStats %) stats-seq))
- :transferred (aggregate-counts (map #(.get_transferred ^ExecutorStats %) stats-seq))})
-
-(defn- collectify
- [obj]
- (if (or (sequential? obj) (instance? Collection obj))
- obj
- [obj]))
-
-(defn aggregate-bolt-stats
- [stats-seq include-sys?]
- (let [stats-seq (collectify stats-seq)]
- (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
- {:acked
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
- stats-seq))
- :failed
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
- stats-seq))
- :executed
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
- stats-seq))
- :process-latencies
- (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
- stats-seq)
- (map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
- stats-seq))
- :execute-latencies
- (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
- stats-seq)
- (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
- stats-seq))})))
-
-(defn aggregate-spout-stats
- [stats-seq include-sys?]
- (let [stats-seq (collectify stats-seq)]
- (merge (pre-process (aggregate-common-stats stats-seq) include-sys?)
- {:acked
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
- stats-seq))
- :failed
- (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_spout get_failed)
- stats-seq))
- :complete-latencies
- (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_spout get_complete_ms_avg)
- stats-seq)
- (map #(.. ^ExecutorStats % get_specific get_spout get_acked)
- stats-seq))})))
-
-(defn get-filled-stats
- [summs]
- (->> summs
- (map #(.get_stats ^ExecutorSummary %))
- (filter not-nil?)))
-
-(defn aggregate-spout-streams
- [stats]
- {:acked (aggregate-count-streams (:acked stats))
- :failed (aggregate-count-streams (:failed stats))
- :emitted (aggregate-count-streams (:emitted stats))
- :transferred (aggregate-count-streams (:transferred stats))
- :complete-latencies (aggregate-avg-streams (:complete-latencies stats)
- (:acked stats))})
-
-(defn spout-streams-stats
- [summs include-sys?]
- (let [stats-seq (get-filled-stats summs)]
- (aggregate-spout-streams
- (aggregate-spout-stats
- stats-seq include-sys?))))
-
-(defn aggregate-bolt-streams
- [stats]
- {:acked (aggregate-count-streams (:acked stats))
- :failed (aggregate-count-streams (:failed stats))
- :emitted (aggregate-count-streams (:emitted stats))
- :transferred (aggregate-count-streams (:transferred stats))
- :process-latencies (aggregate-avg-streams (:process-latencies stats)
- (:acked stats))
- :executed (aggregate-count-streams (:executed stats))
- :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
- (:executed stats))})
-
-(defn compute-executor-capacity
- [^ExecutorSummary e]
- (let [stats (.get_stats e)
- stats (if stats
- (-> stats
- (aggregate-bolt-stats true)
- (aggregate-bolt-streams)
- swap-map-order
- (get (str TEN-MIN-IN-SECONDS))))
- uptime (Utils/nullToZero (.get_uptime_secs e))
- window (if (< uptime TEN-MIN-IN-SECONDS) uptime TEN-MIN-IN-SECONDS)
- executed (-> stats :executed Utils/nullToZero)
- latency (-> stats :execute-latencies Utils/nullToZero)]
- (if (> window 0)
- (div (* executed latency) (* 1000 window)))))
-
-(defn bolt-streams-stats
- [summs include-sys?]
- (let [stats-seq (get-filled-stats summs)]
- (aggregate-bolt-streams
- (aggregate-bolt-stats
- stats-seq include-sys?))))
-
-(defn total-aggregate-stats
- [spout-summs bolt-summs include-sys?]
- (let [spout-stats (get-filled-stats spout-summs)
- bolt-stats (get-filled-stats bolt-summs)
- agg-spout-stats (-> spout-stats
- (aggregate-spout-stats include-sys?)
- aggregate-spout-streams)
- agg-bolt-stats (-> bolt-stats
- (aggregate-bolt-stats include-sys?)
- aggregate-bolt-streams)]
- (merge-with
- (fn [s1 s2]
- (merge-with + s1 s2))
- (select-keys
- agg-bolt-stats
- ;; Include only keys that will be used. We want to count acked and
- ;; failed only for the "tuple trees," so we do not include those keys
- ;; from the bolt executors.
- [:emitted :transferred])
- agg-spout-stats)))
-
-(defn error-subset
- [error-str]
- (apply str (take 200 error-str)))
-
-(defn most-recent-error
- [errors-list]
- (let [error (->> errors-list
- (sort-by #(.get_error_time_secs ^ErrorInfo %))
- reverse
- first)]
- (if error
- (error-subset (.get_error ^ErrorInfo error))
- "")))
-
-(defn float-str [n]
- (if n
- (format "%.3f" (float n))
- "0"))
-
-(defn compute-bolt-capacity
- [executors]
- (->> executors
- (map compute-executor-capacity)
- (map #(Utils/nullToZero %))
- (apply max)))