You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:43 UTC
[37/38] storm git commit: STORM-2153: add taskId to disruptor metrics
STORM-2153: add taskId to disruptor metrics
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/427076eb
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/427076eb
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/427076eb
Branch: refs/heads/1.x-branch
Commit: 427076ebb6761e80f5ef71bbe6843f21854577c8
Parents: 1d42d8f
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Fri Jan 12 19:50:18 2018 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Fri Jan 12 19:50:18 2018 -0500
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 2 +-
.../src/clj/org/apache/storm/daemon/worker.clj | 4 +--
.../src/clj/org/apache/storm/disruptor.clj | 4 +--
.../storm/metrics2/StormMetricRegistry.java | 26 +++++++++++---------
.../org/apache/storm/utils/DisruptorQueue.java | 6 +++--
.../utils/DisruptorQueueBackpressureTest.java | 2 +-
.../apache/storm/utils/DisruptorQueueTest.java | 4 +--
7 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 3af9b2c..ecbfb14 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -234,7 +234,7 @@
(storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
(.getStormId worker-context)
- component-id
+ (first task-ids) component-id
(.getThisWorkerPort worker-context)
:producer-type :multi-threaded
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
index b52da52..dd11959 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@ -211,7 +211,7 @@
(map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e)
(storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
- storm-id worker-id port
+ storm-id (int -1) "__system" port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))]))
(into {})
@@ -256,7 +256,7 @@
executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions))
transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE)
(storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS)
- storm-id worker-id port
+ storm-id (int -1) "__system" port
:batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE)
:batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))
executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id worker-id port)
http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/clj/org/apache/storm/disruptor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj
index c23c505..6bbf0df 100644
--- a/storm-core/src/clj/org/apache/storm/disruptor.clj
+++ b/storm-core/src/clj/org/apache/storm/disruptor.clj
@@ -28,10 +28,10 @@
:single-threaded ProducerType/SINGLE})
(defnk disruptor-queue
- [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+ [^String queue-name buffer-size timeout ^String storm-id ^Integer task-id ^String component-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
(DisruptorQueue. queue-name
(PRODUCER-TYPE producer-type) buffer-size
- timeout batch-size batch-timeout storm-id component-id worker-port))
+ timeout batch-size batch-timeout storm-id component-id task-id worker-port))
(defn clojure-handler
[afn]
http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index cfeb711..1a5bd45 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -46,8 +46,8 @@ public class StormMetricRegistry {
private static String hostName = null;
- public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer port){
- String metricName = metricName(name, topologyId, componentId, port);
+ public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, String componentId, Integer taskId, Integer port){
+ String metricName = metricName(name, topologyId, componentId, taskId, port);
if(REGISTRY.getGauges().containsKey(metricName)){
return (SimpleGauge)REGISTRY.getGauges().get(metricName);
} else {
@@ -55,16 +55,16 @@ public class StormMetricRegistry {
}
}
- public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer port){
+ public static DisruptorMetrics disruptorMetrics(String name, String topologyId, String componentId, Integer taskId, Integer port){
return new DisruptorMetrics(
- StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, port),
- StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, port),
- StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, port),
- StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, port),
- StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, port),
- StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, port),
- StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, port),
- StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, port)
+ StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, componentId, taskId, port),
+ StormMetricRegistry.gauge(0L, name + "-population", topologyId, componentId, taskId, port),
+ StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, componentId, taskId, port),
+ StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, componentId, taskId, port),
+ StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, componentId, taskId, port),
+ StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, componentId, taskId, port),
+ StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, componentId, taskId, port),
+ StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, componentId, taskId, port)
);
}
@@ -147,7 +147,7 @@ public class StormMetricRegistry {
return sb.toString();
}
- public static String metricName(String name, String stormId, String componentId, Integer workerPort) {
+ public static String metricName(String name, String stormId, String componentId, Integer taskId, Integer workerPort) {
StringBuilder sb = new StringBuilder("storm.worker.");
sb.append(stormId);
sb.append(".");
@@ -155,6 +155,8 @@ public class StormMetricRegistry {
sb.append(".");
sb.append(dotToUnderScore(componentId));
sb.append(".");
+ sb.append(taskId);
+ sb.append(".");
sb.append(workerPort);
sb.append("-");
sb.append(name);
http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
index d7497d6..afa5158 100644
--- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
+++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java
@@ -418,7 +418,9 @@ public class DisruptorQueue implements IStatefulObject {
private final AtomicLong tuplePopulation = new AtomicLong(0);
private volatile boolean _throttleOn = false;
- public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, int port) {
+ // [^String queue-name buffer-size timeout ^String storm-id ^String component-id ^Integer task-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1]
+
+ public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, String componentId, Integer taskId, int port) {
this._queueName = PREFIX + queueName;
WaitStrategy wait;
if (readTimeout <= 0) {
@@ -432,7 +434,7 @@ public class DisruptorQueue implements IStatefulObject {
_barrier = _buffer.newBarrier();
_buffer.addGatingSequences(_consumer);
_metrics = new QueueMetrics();
- _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
+ _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, taskId, port);
//The batch size can be no larger than half the full queue size.
//This is mostly to avoid contention issues.
_inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
index ba2b507..15eb8c4 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java
@@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase {
}
private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test",1000);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test",1000, 1000);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/427076eb/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
----------------------------------------------------------------------
diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
index 59de55d..65d627c 100644
--- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
+++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java
@@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase {
}
private static DisruptorQueue createQueue(String name, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test", 1000);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", "test", 1000, 1000);
}
private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) {
- return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", "test", 1000);
+ return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", "test", 1000, 1000);
}
}