You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:43 UTC

[37/38] storm git commit: STORM-2153: add taskId to disruptor metrics

STORM-2153: add taskId to disruptor metrics


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/427076eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/427076eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/427076eb

Branch: refs/heads/1.x-branch
Commit: 427076ebb6761e80f5ef71bbe6843f21854577c8
Parents: 1d42d8f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Jan 12 19:50:18 2018 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jan 12 19:50:18 2018 -0500

----------------------------------------------------------------------
 .../clj/org/apache/storm/daemon/executor.clj    |  2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  4 +--
 .../src/clj/org/apache/storm/disruptor.clj      |  4 +--
 .../storm/metrics2/StormMetricRegistry.java     | 26 +++++++++++---------
 .../org/apache/storm/utils/DisruptorQueue.java  |  6 +++--
 .../utils/DisruptorQueueBackpressureTest.java   |  2 +-
 .../apache/storm/utils/DisruptorQueueTest.java  |  4 +--
 7 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 3af9b2c..ecbfb14 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -234,7 +234,7 @@
                                   (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
                                   (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
                                   (.getStormId worker-context)
-                                  component-id
+                                  (first task-ids) component-id
                                   (.getThisWorkerPort worker-context)
                                   :producer-type :multi-threaded
                                   :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)

http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index b52da52..dd11959 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -211,7 +211,7 @@
        (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
                                                   (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
                                                   (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  storm-id worker-id port
+                                                  storm-id (int -1) "__system" port
                                                   :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
        (into {})
@@ -256,7 +256,7 @@
         executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
         transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
                                                   (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
-                                                  storm-id worker-id port
+                                                  storm-id (int -1) "__system" port
                                                   :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
                                                   :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
         executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id worker-id port)

http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj
index c23c505..6bbf0df 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -28,10 +28,10 @@
    :single-threaded ProducerType/SINGLE})
 
 (defnk disruptor-queue
-  [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+  [^String queue-name buffer-size timeout ^String storm-id ^Integer task-id  ^String component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
   (DisruptorQueue. queue-name
                    (PRODUCER-TYPE producer-type) buffer-size
-                   timeout batch-size batch-timeout storm-id component-id worker-port))
+                   timeout batch-size batch-timeout storm-id component-id task-id worker-port))
 
 (defn clojure-handler
   [afn]

http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index cfeb711..1a5bd45 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -46,8 +46,8 @@ public class StormMetricRegistry {
 
     private static String hostName = null;
 
-    public static <T> SimpleGauge<T>  gauge(T initialValue, String name, String topologyId, String componentId, Integer port){
-        String metricName = metricName(name, topologyId, componentId, port);
+    public static <T> SimpleGauge<T>  gauge(T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port){
+        String metricName = metricName(name, topologyId, componentId, taskId, port);
         if(REGISTRY.getGauges().containsKey(metricName)){
             return (SimpleGauge)REGISTRY.getGauges().get(metricName);
         } else {
@@ -55,16 +55,16 @@ public class StormMetricRegistry {
         }
     }
 
-    public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){
+    public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port){
         return new DisruptorMetrics(
-                StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port),
-                StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port),
-                StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port),
-                StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port),
-                StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port),
-                StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port),
-                StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port),
-                StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port)
+                StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, taskId, port),
+                StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, taskId, port),
+                StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, taskId, port),
+                StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, taskId, port),
+                StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, taskId, port),
+                StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, taskId, port),
+                StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, taskId, port),
+                StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, taskId, port)
         );
     }
 
@@ -147,7 +147,7 @@ public class StormMetricRegistry {
         return sb.toString();
     }
 
-    public static String metricName(String name, String stormId, String componentId, Integer workerPort) {
+    public static String metricName(String name, String stormId, String componentId, Integer taskId, Integer workerPort) {
         StringBuilder sb = new StringBuilder("storm.worker.");
         sb.append(stormId);
         sb.append(".");
@@ -155,6 +155,8 @@ public class StormMetricRegistry {
         sb.append(".");
         sb.append(dotToUnderScore(componentId));
         sb.append(".");
+        sb.append(taskId);
+        sb.append(".");
         sb.append(workerPort);
         sb.append("-");
         sb.append(name);

http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index d7497d6..afa5158 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -418,7 +418,9 @@ public class DisruptorQueue implements IStatefulObject {
     private final AtomicLong tuplePopulation = new AtomicLong(0);
     private volatile boolean _throttleOn = false;
 
-    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, int port) {
+    // [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer task-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+
+    public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, Integer taskId, int port) {
         this._queueName = PREFIX + queueName;
         WaitStrategy wait;
         if (readTimeout <= 0) {
@@ -432,7 +434,7 @@ public class DisruptorQueue implements IStatefulObject {
         _barrier = _buffer.newBarrier();
         _buffer.addGatingSequences(_consumer);
         _metrics = new QueueMetrics();
-        _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
+        _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, taskId, port);
         //The batch size can be no larger than half the full queue size.
         //This is mostly to avoid contention issues.
         _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));

http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
index ba2b507..15eb8c4 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
@@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test",1000);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test",1000, 1000);
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
index 59de55d..65d627c 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
@@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase {
     }
 
     private static DisruptorQueue createQueue(String name, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test", 1000);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test", 1000, 1000);
     }
 
     private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) {
-        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", "test", 1000);
+        return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", "test", 1000, 1000);
     }
 }