You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2018/01/22 17:41:24 UTC
[18/38] storm git commit: STORM-2153: add streamId and executorId to
metrics names; replace '.' with '_' in metrics names
STORM-2153: add streamId and executorId to metrics names; replace '.' with '_' in metrics names
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c91da676
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c91da676
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c91da676
Branch: refs/heads/1.x-branch
Commit: c91da676e7fc550ab44fd2d4d91dc95243059eb7
Parents: dd977e8
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Dec 13 15:25:56 2017 -0500
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Dec 13 15:25:56 2017 -0500
----------------------------------------------------------------------
.../clj/org/apache/storm/daemon/executor.clj | 6 ++--
.../src/clj/org/apache/storm/daemon/task.clj | 7 ++---
.../storm/metrics2/StormMetricRegistry.java | 32 +++++++++++++++-----
3 files changed, 30 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 0aca4bd..fa7d44c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -280,8 +280,6 @@
(log-message "Got interrupted excpetion shutting thread down...")
((:suicide-fn <>))))
:sampler (mk-stats-sampler storm-conf)
- :failed-counter (StormMetricRegistry/counter "failed" worker-context component-id)
- :acked-counter (StormMetricRegistry/counter "acked" worker-context component-id)
:spout-throttling-metrics (if (= executor-type :spout)
(builtin-metrics/make-spout-throttling-data)
nil)
@@ -444,7 +442,7 @@
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-failed-tuple! (:stats executor-data) (:failed-counter executor-data) (:stream tuple-info) time-delta))))
+ (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) (:executor-id executor-data) (:stream tuple-info)) (:stream tuple-info) time-delta))))
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?]
(let [^ISpout spout (:object task-data)
@@ -453,7 +451,7 @@
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-acked-tuple! (:stats executor-data) (:acked-counter executor-data) (:stream tuple-info) time-delta))))
+ (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" worker-context (:component-id executor-data) (:executor-id executor-data) (:stream tuple-info)) (:stream tuple-info) time-delta))))
(defn mk-task-receiver [executor-data tuple-action-fn]
(let [task-ids (:task-ids executor-data)
http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/clj/org/apache/storm/daemon/task.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj
index a2f6c54..edc144c 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/task.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj
@@ -130,8 +130,7 @@
stream->component->grouper (:stream->component->grouper executor-data)
user-context (:user-context task-data)
executor-stats (:stats executor-data)
- debug? (= true (storm-conf TOPOLOGY-DEBUG))
- ^Counter emitted-counter (StormMetricRegistry/counter "emitted" worker-context component-id)]
+ debug? (= true (storm-conf TOPOLOGY-DEBUG))]
(fn ([^Integer out-task-id ^String stream ^List values]
(when debug?
@@ -144,7 +143,7 @@
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
(when (emit-sampler)
- (stats/emitted-tuple! executor-stats emitted-counter stream)
+ (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (:executor-id executor-data) stream) stream)
(if out-task-id
(stats/transferred-tuples! executor-stats stream 1)))
(if out-task-id [out-task-id])
@@ -164,7 +163,7 @@
)))
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(when (emit-sampler)
- (stats/emitted-tuple! executor-stats emitted-counter stream)
+ (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (:executor-id executor-data) stream) stream)
(stats/transferred-tuples! executor-stats stream (count out-tasks)))
out-tasks)))
))
http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 200ddcf..2bab4e9 100644
--- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -68,20 +68,20 @@ public class StormMetricRegistry {
);
}
- public static Meter meter(String name, WorkerTopologyContext context, String componentId){
- String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort());
+ public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){
+ String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort());
return REGISTRY.meter(metricName);
}
- public static Counter counter(String name, WorkerTopologyContext context, String componentId){
- String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort());
+ public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){
+ String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort());
return REGISTRY.counter(metricName);
}
public static void start(Map<String, Object> stormConfig, DaemonType type){
String localHost = "localhost";
try {
- hostName = Utils.localHostname();
+ hostName = dotToUnderScore(Utils.localHostname());
} catch (UnknownHostException e) {
LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" +
" as 'localhost'.");
@@ -130,9 +130,27 @@ public class StormMetricRegistry {
}
}
- public static String metricName(String name, String stormId, String componentId, Integer workerPort){
- return String.format("storm.worker.%s.%s.%s.%s-%s", stormId, hostName, componentId, workerPort, name);
+ public static String metricName(String name, String stormId, String componentId, String streamId, String executorId, Integer workerPort){
+ return String.format("storm.worker.%s.%s.%s.%s.%s.%s-%s",
+ stormId,
+ hostName,
+ dotToUnderScore(componentId),
+ dotToUnderScore(streamId),
+ dotToUnderScore(executorId),
+ workerPort,
+ name);
}
+ public static String metricName(String name, String stormId, String componentId, Integer workerPort){
+ return String.format("storm.worker.%s.%s.%s.%s-%s",
+ stormId,
+ hostName,
+ dotToUnderScore(componentId),
+ workerPort,
+ name);
+ }
+ private static String dotToUnderScore(String str){
+ return str.replace('.', '_');
+ }
}