You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/29 14:58:47 UTC

[storm] branch master updated: STORM-3648 add meter to track worker heartbeat rate

This is an automated email from the ASF dual-hosted git repository.

ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new 2643f69  STORM-3648 add meter to track worker heartbeat rate
     new 2924dca  Merge pull request #3294 from agresch/agresch_storm_3648
2643f69 is described below

commit 2643f6952c6258dcd45581aa8a9eadc7eb11ad8f
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Fri Jun 5 13:05:06 2020 -0500

    STORM-3648 add meter to track worker heartbeat rate
---
 docs/Metrics.md                                                    | 5 +++++
 storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java    | 5 +++++
 .../src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java     | 7 +++++++
 3 files changed, 17 insertions(+)

diff --git a/docs/Metrics.md b/docs/Metrics.md
index 8df1a45..7f6ba47 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -327,3 +327,8 @@ Please refer to the [JVM documentation](https://docs.oracle.com/javase/8/docs/ap
 * `uptimeSecs` reports the number of seconds the worker has been up for
 * `newWorkerEvent` is 1 when a worker is first started and 0 all other times.  This can be used to tell when a worker has crashed and is restarted.
 * `startTimeSecs` is when the worker started in seconds since the epoch
+
+##### doHeartbeat-calls
+
+* `doHeartbeat-calls` is a meter that indicates the rate the worker is performing heartbeats.
+
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index b4e4c80..ee8e005 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -12,6 +12,7 @@
 
 package org.apache.storm.daemon.worker;
 
+import com.codahale.metrics.Meter;
 import java.io.File;
 import java.io.IOException;
 import java.net.UnknownHostException;
@@ -83,6 +84,7 @@ public class Worker implements Shutdownable, DaemonCommon {
     private final String workerId;
     private final LogConfigManager logConfigManager;
     private final StormMetricRegistry metricRegistry;
+    private Meter heatbeatMeter;
 
     private WorkerState workerState;
     private AtomicReference<List<IRunningExecutor>> executorsAtom;
@@ -196,6 +198,8 @@ public class Worker implements Shutdownable, DaemonCommon {
         throws Exception {
         workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
                                       topologyConf, stateStorage, stormClusterState, autoCreds, metricRegistry);
+        this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
+                Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
 
         // Heartbeat here so that worker process dies if this fails
         // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
@@ -362,6 +366,7 @@ public class Worker implements Shutdownable, DaemonCommon {
         state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
         // it shouldn't take supervisor 120 seconds between listing dir and reading it
         heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+        this.heatbeatMeter.mark();
     }
 
     public void doExecutorHeartbeats() {
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 4d62cf4..d095cc0 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -81,6 +81,13 @@ public class StormMetricRegistry {
         return meter;
     }
 
+    public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId) {
+        MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, taskId, context.getThisWorkerPort());
+        Meter meter = registry.meter(metricNames.getLongName());
+        saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
+        return meter;
+    }
+
     public Meter meter(String name, TopologyContext context) {
         MetricNames metricNames = topologyMetricName(name, context);
         Meter meter = registry.meter(metricNames.getLongName());