You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:19 UTC
[13/38] storm git commit: WIP: replace Meter to Counter
WIP: replace Meter to Counter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9a9f507
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9a9f507
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9a9f507
Branch: refs/heads/1.x-branch
Commit: e9a9f507eaeb1022615066a98b7822e829f58e0a
Parents: 99bcf68
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Nov 27 17:50:04 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Nov 27 17:50:04 2017 +0900
----------------------------------------------------------------------
.../src/clj/org/apache/storm/daemon/executor.clj | 14 +++++++-------
storm-core/src/clj/org/apache/storm/daemon/task.clj | 8 ++++----
.../apache/storm/metrics2/StormMetricRegistry.java | 6 ++++++
3 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index c6f206e..94bd7af 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -36,7 +36,7 @@
(:import [org.apache.storm Config Constants])
(:import [org.apache.storm.cluster ClusterStateContext DaemonType])
(:import [org.apache.storm.metrics2 StormMetricRegistry])
- (:import [com.codahale.metrics Meter])
+ (:import [com.codahale.metrics Meter Counter])
(:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
(:import [java.util.concurrent ConcurrentLinkedQueue])
(:require [org.apache.storm [thrift :as thrift]
@@ -280,8 +280,8 @@
(log-message "Got interrupted excpetion shutting thread down...")
((:suicide-fn <>))))
:sampler (mk-stats-sampler storm-conf)
- :failed-meter (StormMetricRegistry/meter "failed" worker-context component-id)
- :acked-meter (StormMetricRegistry/meter "acked" worker-context component-id)
+ :failed-meter (StormMetricRegistry/counter "failed" worker-context component-id)
+ :acked-meter (StormMetricRegistry/counter "acked" worker-context component-id)
:spout-throttling-metrics (if (= executor-type :spout)
(builtin-metrics/make-spout-throttling-data)
nil)
@@ -442,7 +442,7 @@
;;TODO: need to throttle these when there's lots of failures
(when debug?
(log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
- (.mark failed-meter)
+ (.inc ^Counter failed-meter)
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
@@ -453,7 +453,7 @@
task-id (:task-id task-data)
acked-meter (:acked-meter executor-data)]
(when debug? (log-message "SPOUT Acking message " id " " msg-id))
- (.mark acked-meter)
+ (.inc ^Counter acked-meter)
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
@@ -823,7 +823,7 @@
(let [delta (tuple-time-delta! tuple)]
(when debug?
(log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (.mark ^Meter (:acked-meter (:executor-data task-data)))
+ (.inc ^Counter (:acked-meter (:executor-data task-data)))
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when (<= 0 delta)
(stats/bolt-acked-tuple! executor-stats
@@ -839,7 +839,7 @@
debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(when debug?
(log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
- (.mark ^Meter (:failed-meter (:executor-data task-data)))
+ (.inc ^Counter (:failed-meter (:executor-data task-data)))
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when (<= 0 delta)
(stats/bolt-failed-tuple! executor-stats
http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 2e4df75..7162f7f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -28,7 +28,7 @@
(:import [org.apache.storm.generated ShellComponent JavaObject])
(:import [org.apache.storm.spout ShellSpout])
(:import [java.util Collection List ArrayList])
- (:import [com.codahale.metrics Meter])
+ (:import [com.codahale.metrics Meter Counter])
(:require [org.apache.storm
[thrift :as thrift]
[stats :as stats]])
@@ -131,10 +131,10 @@
user-context (:user-context task-data)
executor-stats (:stats executor-data)
debug? (= true (storm-conf TOPOLOGY-DEBUG))
- ^Meter emitted-meter (StormMetricRegistry/meter "emitted" worker-context component-id)]
+ ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)]
(fn ([^Integer out-task-id ^String stream ^List values]
- (.mark emitted-meter)
+ (.inc ^Counter emitted-meter)
(when debug?
(log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
(let [target-component (.getComponentId worker-context out-task-id)
@@ -151,7 +151,7 @@
(if out-task-id [out-task-id])
))
([^String stream ^List values]
- (.mark emitted-meter)
+ (.inc ^Counter emitted-meter)
(when debug?
(log-message "Emitting: " component-id " " stream " " values))
(let [out-tasks (ArrayList.)]
http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 60d4191..912d888 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -17,6 +17,7 @@
*/
package org.apache.storm.metrics2;
+import com.codahale.metrics.Counter;
import com.codahale.metrics.Meter;
import com.codahale.metrics.MetricRegistry;
import org.apache.storm.Config;
@@ -72,6 +73,11 @@ public class StormMetricRegistry {
return REGISTRY.meter(metricName);
}
+ public static Counter counter(String name, WorkerTopologyContext context, String componentId){
+ String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort());
+ return REGISTRY.counter(metricName);
+ }
+
public static void start(Map<String, Object> stormConfig, DaemonType type){
String localHost = "localhost";
try {