You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/01/11 21:52:49 UTC

storm git commit: STORM-2153: move task-metrics from executor to task to avoid map lookup

Repository: storm
Updated Branches:
  refs/heads/metrics_v2 8e2f7e7ef -> b257ba47a


STORM-2153: move task-metrics from executor to task to avoid map lookup


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b257ba47
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b257ba47
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b257ba47

Branch: refs/heads/metrics_v2
Commit: b257ba47aae42d5486901a1252cd9b5c0d9ad70e
Parents: 8e2f7e7
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Fri Jan 12 06:52:22 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Fri Jan 12 06:52:22 2018 +0900

----------------------------------------------------------------------
 storm-core/src/clj/org/apache/storm/daemon/executor.clj     | 9 ++++-----
 storm-core/src/clj/org/apache/storm/daemon/task.clj         | 9 +++++----
 .../src/jvm/org/apache/storm/metrics2/TaskMetrics.java      | 8 --------
 3 files changed, 9 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/b257ba47/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 3dd7289..3af9b2c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -265,7 +265,6 @@
      :stats (mk-executor-stats <> (sampling-rate storm-conf))
      :interval->task->metric-registry (HashMap.)
      :task->component (:task->component worker)
-     :task-metrics (TaskMetrics/taskMetricsMap (first task-ids) (last task-ids) worker-context component-id)
      :stream->component->grouper (outbound-components worker-context component-id storm-conf)
      :report-error (throttled-report-error-fn <>)
      :report-error-and-die (fn [error]
@@ -443,7 +442,7 @@
     (.fail spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-failed-tuple! (:stats executor-data)  (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta))))
+      (stats/spout-failed-tuple! (:stats executor-data)  (.getFailed ^TaskMetrics (:task-metrics task-data) (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
   (let [^ISpout spout (:object task-data)
@@ -452,7 +451,7 @@
     (.ack spout msg-id)
     (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
     (when time-delta
-      (stats/spout-acked-tuple! (:stats executor-data) (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (:stream tuple-info)) (:stream tuple-info) time-delta))))
+      (stats/spout-acked-tuple! (:stats executor-data) (.getAcked ^TaskMetrics (:task-metrics task-data) (:stream tuple-info)) (:stream tuple-info) time-delta))))
 
 (defn mk-task-receiver [executor-data tuple-action-fn]
   (let [task-ids (:task-ids executor-data)
@@ -821,7 +820,7 @@
                            (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-acked-tuple! executor-stats
-                                                      (.getAcked ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple))
+                                                      (.getAcked ^TaskMetrics (:task-metrics task-data) (.getSourceStreamId tuple))
                                                       (.getSourceComponent tuple)
                                                       (.getSourceStreamId tuple)
                                                       delta))))
@@ -837,7 +836,7 @@
                            (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
                            (when (<= 0 delta)
                              (stats/bolt-failed-tuple! executor-stats
-                                                       (.getFailed ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) (.getSourceStreamId tuple))
+                                                       (.getFailed ^TaskMetrics (:task-metrics task-data) (.getSourceStreamId tuple))
                                                        (.getSourceComponent tuple)
                                                        (.getSourceStreamId tuple)
                                                        delta))))

http://git-wip-us.apache.org/repos/asf/storm/blob/b257ba47/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index 68af75b..82bd2c5 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -143,9 +143,9 @@
               (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))                          
             (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
             (when (emit-sampler)
-              (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream)
+              (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (:task-metrics task-data) stream) stream)
               (if out-task-id
-                (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream 1)))
+                (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (:task-metrics task-data) stream) stream 1)))
             (if out-task-id [out-task-id])
             ))
         ([^String stream ^List values]
@@ -163,8 +163,8 @@
                    )))
              (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
              (when (emit-sampler)
-               (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream)
-               (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (.get ^Map (:task-metrics executor-data) task-id) stream) stream (count out-tasks)))
+               (stats/emitted-tuple! executor-stats (.getEmitted ^TaskMetrics (:task-metrics task-data) stream) stream)
+               (stats/transferred-tuples! executor-stats (.getTransferred ^TaskMetrics (:task-metrics task-data) stream) stream (count out-tasks)))
              out-tasks)))
     ))
 
@@ -175,6 +175,7 @@
     :system-context (system-topology-context (:worker executor-data) executor-data task-id)
     :user-context (user-topology-context (:worker executor-data) executor-data task-id)
     :builtin-metrics (builtin-metrics/make-data (:type executor-data) (:stats executor-data))
+    :task-metrics (TaskMetrics. (:worker-context executor-data) (:component-id executor-data) task-id)
     :tasks-fn (mk-tasks-fn <>)
     :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/b257ba47/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
index 550b176..05c62da 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java
@@ -78,12 +78,4 @@ public class TaskMetrics {
         }
         return c;
     }
-
-    public static Map<Integer, TaskMetrics> taskMetricsMap(Integer startTaskId, Integer endTaskId, WorkerTopologyContext context, String componentId){
-        Map<Integer, TaskMetrics> retval = new HashMap<>();
-        for (int i = startTaskId; i < endTaskId + 1; i++) {
-            retval.put(i, new TaskMetrics(context, componentId, i));
-        }
-        return retval;
-    }
 }