You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:19 UTC

[13/38] storm git commit: WIP: replace Meter to Counter

WIP: replace Meter to Counter


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9a9f507
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9a9f507
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9a9f507

Branch: refs/heads/1.x-branch
Commit: e9a9f507eaeb1022615066a98b7822e829f58e0a
Parents: 99bcf68
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Mon Nov 27 17:50:04 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Mon Nov 27 17:50:04 2017 +0900

----------------------------------------------------------------------
 .../src/clj/org/apache/storm/daemon/executor.clj      | 14 +++++++-------
 storm-core/src/clj/org/apache/storm/daemon/task.clj   |  8 ++++----
 .../apache/storm/metrics2/StormMetricRegistry.java    |  6 ++++++
 3 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index c6f206e..94bd7af 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -36,7 +36,7 @@
   (:import [org.apache.storm Config Constants])
   (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
   (:import [org.apache.storm.metrics2 StormMetricRegistry])
-  (:import [com.codahale.metrics Meter])
+  (:import [com.codahale.metrics Meter Counter])
   (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
   (:import [java.util.concurrent ConcurrentLinkedQueue])
   (:require [org.apache.storm [thrift :as thrift]
@@ -280,8 +280,8 @@
                                (log-message "Got interrupted excpetion shutting thread down...")
                                ((:suicide-fn <>))))
      :sampler (mk-stats-sampler storm-conf)
-     :failed-meter (StormMetricRegistry/meter "failed" worker-context component-id)
-     :acked-meter (StormMetricRegistry/meter "acked" worker-context component-id)
+     :failed-meter (StormMetricRegistry/counter "failed" worker-context component-id)
+     :acked-meter (StormMetricRegistry/counter "acked" worker-context component-id)
      :spout-throttling-metrics (if (= executor-type :spout)
                                 (builtin-metrics/make-spout-throttling-data)
                                 nil)
@@ -442,7 +442,7 @@
     ;;TODO: need to throttle these when there's lots of failures
     (when debug?
       (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id))
-    (.mark failed-meter)
+    (.inc ^Counter failed-meter)
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
@@ -453,7 +453,7 @@
         task-id (:task-id task-data)
         acked-meter (:acked-meter executor-data)]
     (when debug? (log-message "SPOUT Acking message " id " " msg-id))
-    (.mark acked-meter)
+    (.inc ^Counter acked-meter)
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
@@ -823,7 +823,7 @@
                          (let [delta (tuple-time-delta! tuple)]
                            (when debug? 
                              (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                           (.mark  ^Meter (:acked-meter (:executor-data task-data)))
+                           (.inc  ^Counter (:acked-meter (:executor-data task-data)))
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-acked-tuple! executor-stats
@@ -839,7 +839,7 @@
                                debug? (= true (storm-conf TOPOLOGY-DEBUG))]
                            (when debug? 
                              (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple))
-                           (.mark  ^Meter (:failed-meter (:executor-data task-data)))
+                           (.inc  ^Counter (:failed-meter (:executor-data task-data)))
                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-failed-tuple! executor-stats

http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 2e4df75..7162f7f 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -28,7 +28,7 @@
   (:import [org.apache.storm.generated ShellComponent JavaObject])
   (:import [org.apache.storm.spout ShellSpout])
   (:import [java.util Collection List ArrayList])
-  (:import [com.codahale.metrics Meter])
+  (:import [com.codahale.metrics Meter Counter])
   (:require [org.apache.storm
              [thrift :as thrift]
              [stats :as stats]])
@@ -131,10 +131,10 @@
         user-context (:user-context task-data)
         executor-stats (:stats executor-data)
         debug? (= true (storm-conf TOPOLOGY-DEBUG))
-        ^Meter emitted-meter (StormMetricRegistry/meter "emitted" worker-context component-id)]
+        ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)]
         
     (fn ([^Integer out-task-id ^String stream ^List values]
-          (.mark emitted-meter)
+          (.inc ^Counter emitted-meter)
           (when debug?
             (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values))
           (let [target-component (.getComponentId worker-context out-task-id)
@@ -151,7 +151,7 @@
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
-           (.mark emitted-meter)
+           (.inc ^Counter emitted-meter)
            (when debug?
              (log-message "Emitting: " component-id " " stream " " values))
            (let [out-tasks (ArrayList.)]

http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 60d4191..912d888 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.metrics2;
 
+import com.codahale.metrics.Counter;
 import com.codahale.metrics.Meter;
 import com.codahale.metrics.MetricRegistry;
 import org.apache.storm.Config;
@@ -72,6 +73,11 @@ public class StormMetricRegistry {
         return REGISTRY.meter(metricName);
     }
 
+    public static Counter counter(String name, WorkerTopologyContext context, String componentId){
+        String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort());
+        return REGISTRY.counter(metricName);
+    }
+
     public static void start(Map<String, Object> stormConfig, DaemonType type){
         String localHost = "localhost";
         try {