You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:21 UTC
[15/38] storm git commit: STORM-2153 New Metrics Reporting API
STORM-2153 New Metrics Reporting API
* address missing sampling rate
* rename field names cause we use Counter instead of Meter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/00a382b0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/00a382b0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/00a382b0
Branch: refs/heads/1.x-branch
Commit: 00a382b017c1e29863ac4d9a4449086ef79384e4
Parents: 85dbacd
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Nov 30 10:38:27 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Nov 30 10:41:13 2017 +0900
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 22 ++++------
.../src/clj/org/apache/storm/daemon/task.clj | 8 ++--
storm-core/src/clj/org/apache/storm/stats.clj | 43 ++++++++++++--------
3 files changed, 39 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 720bfa7..0aca4bd 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -257,7 +257,7 @@
:batch-transfer-queue batch-transfer->worker
:transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
:suicide-fn (:suicide-fn worker)
- :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker)
+ :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker)
:acls (Utils/getWorkerACL storm-conf)
:context (ClusterStateContext. DaemonType/WORKER))
:type executor-type
@@ -280,8 +280,8 @@
(log-message "Got interrupted excpetion shutting thread down...")
((:suicide-fn <>))))
:sampler (mk-stats-sampler storm-conf)
- :failed-meter (StormMetricRegistry/counter "failed" worker-context component-id)
- :acked-meter (StormMetricRegistry/counter "acked" worker-context component-id)
+ :failed-counter (StormMetricRegistry/counter "failed" worker-context component-id)
+ :acked-counter (StormMetricRegistry/counter "acked" worker-context component-id)
:spout-throttling-metrics (if (= executor-type :spout)
(builtin-metrics/make-spout-throttling-data)
nil)
@@ -437,27 +437,23 @@
(defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?]
(let [^ISpout spout (:object task-data)
storm-conf (:storm-conf executor-data)
- task-id (:task-id task-data)
- failed-meter (:failed-meter executor-data)]
+ task-id (:task-id task-data)]
;;TODO: need to throttle these when there's lots of failures
(when debug?
(log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
- (.inc ^Counter failed-meter)
- (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+ (stats/spout-failed-tuple! (:stats executor-data) (:failed-counter executor-data) (:stream tuple-info) time-delta))))
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
(let [^ISpout spout (:object task-data)
- task-id (:task-id task-data)
- acked-meter (:acked-meter executor-data)]
+ task-id (:task-id task-data)]
(when debug? (log-message "SPOUT Acking message " id " " msg-id))
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
- (.inc ^Counter acked-meter)
- (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
+ (stats/spout-acked-tuple! (:stats executor-data) (:acked-counter executor-data) (:stream tuple-info) time-delta))))
(defn mk-task-receiver [executor-data tuple-action-fn]
(let [task-ids (:task-ids executor-data)
@@ -825,8 +821,8 @@
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when (<= 0 delta)
- (.inc ^Counter (:acked-meter (:executor-data task-data)))
(stats/bolt-acked-tuple! executor-stats
+ (:acked-counter (:executor-data task-data))
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
@@ -841,8 +837,8 @@
(log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when (<= 0 delta)
- (.inc ^Counter (:failed-meter (:executor-data task-data)))
(stats/bolt-failed-tuple! executor-stats
+ (:failed-counter (:executor-data task-data))
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index c43d20d..a2f6c54 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -131,7 +131,7 @@
user-context (:user-context task-data)
executor-stats (:stats executor-data)
debug? (= true (storm-conf TOPOLOGY-DEBUG))
- ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)]
+ ^Counter emitted-counter (StormMetricRegistry/counter "emitted" worker-context component-id)]
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
@@ -144,8 +144,7 @@
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
(when (emit-sampler)
- (.inc ^Counter emitted-meter)
- (stats/emitted-tuple! executor-stats stream)
+ (stats/emitted-tuple! executor-stats emitted-counter stream)
(if out-task-id
(stats/transferred-tuples! executor-stats stream 1)))
(if out-task-id [out-task-id])
@@ -165,8 +164,7 @@
)))
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(when (emit-sampler)
- (.inc ^Counter emitted-meter)
- (stats/emitted-tuple! executor-stats stream)
+ (stats/emitted-tuple! executor-stats emitted-counter stream)
(stats/transferred-tuples! executor-stats stream (count out-tasks)))
out-tasks)))
))
http://git-wip-us.apache.org/repos/asf/storm/blob/00a382b0/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/stats.clj b/storm-core/src/clj/org/apache/storm/stats.clj
index 17d0219..41aaf04 100644
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@ -26,7 +26,8 @@
WorkerResources])
(:import [org.apache.storm.utils Utils])
(:import [org.apache.storm.scheduler WorkerSlot])
- (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric])
+ (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]
+ (com.codahale.metrics Counter))
(:use [org.apache.storm log util])
(:use [clojure.math.numeric-tower :only [ceil]]))
@@ -117,9 +118,11 @@
`(:complete-latencies ~stats))
(defn emitted-tuple!
- [stats stream]
- (let [^MultiCountStatAndMetric emitted (stats-emitted stats)]
- (.incBy emitted ^Object stream ^long (stats-rate stats))))
+ [stats ^Counter emitted-counter stream]
+ (let [^MultiCountStatAndMetric emitted (stats-emitted stats)
+ ^long rate (stats-rate stats)]
+ (.incBy emitted ^Object stream rate)
+ (.inc emitted-counter rate)))
(defn transferred-tuples!
[stats stream amt]
@@ -135,30 +138,38 @@
(.record exec-lat key latency-ms)))
(defn bolt-acked-tuple!
- [^BoltExecutorStats stats component stream latency-ms]
+ [^BoltExecutorStats stats ^Counter acked-counter component stream latency-ms]
(let [key [component stream]
^MultiCountStatAndMetric acked (stats-acked stats)
- ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)]
- (.incBy acked key (stats-rate stats))
+ ^MultiLatencyStatAndMetric process-lat (stats-process-latencies stats)
+ ^long rate (stats-rate stats)]
+ (.incBy acked key rate)
+ (.inc acked-counter rate)
(.record process-lat key latency-ms)))
(defn bolt-failed-tuple!
- [^BoltExecutorStats stats component stream latency-ms]
+ [^BoltExecutorStats stats ^Counter failed-counter component stream latency-ms]
(let [key [component stream]
- ^MultiCountStatAndMetric failed (stats-failed stats)]
- (.incBy failed key (stats-rate stats))))
+ ^MultiCountStatAndMetric failed (stats-failed stats)
+ ^long rate (stats-rate stats)]
+ (.incBy failed key rate)
+ (.inc failed-counter rate)))
(defn spout-acked-tuple!
- [^SpoutExecutorStats stats stream latency-ms]
+ [^SpoutExecutorStats stats ^Counter acked-counter stream latency-ms]
(let [^MultiCountStatAndMetric acked (stats-acked stats)
- ^MultiLatencyStatAndMetric complete-latencies (stats-complete-latencies stats)]
- (.incBy acked stream (stats-rate stats))
+ ^MultiLatencyStatAndMetric complete-latencies (stats-complete-latencies stats)
+ ^long rate (stats-rate stats)]
+ (.incBy acked stream rate)
+ (.inc acked-counter rate)
(.record complete-latencies stream latency-ms)))
(defn spout-failed-tuple!
- [^SpoutExecutorStats stats stream latency-ms]
- (let [^MultiCountStatAndMetric failed (stats-failed stats)]
- (.incBy failed stream (stats-rate stats))))
+ [^SpoutExecutorStats stats ^Counter failed-counter stream latency-ms]
+ (let [^MultiCountStatAndMetric failed (stats-failed stats)
+ ^long rate (stats-rate stats)]
+ (.incBy failed stream rate)
+ (.inc failed-counter rate)))
(defn- close-stat! [stat]
(.close stat))