You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by et...@apache.org on 2020/06/29 14:58:47 UTC
[storm] branch master updated: STORM-3648 add meter to track worker
heartbeat rate
This is an automated email from the ASF dual-hosted git repository.
ethanli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new 2643f69 STORM-3648 add meter to track worker heartbeat rate
new 2924dca Merge pull request #3294 from agresch/agresch_storm_3648
2643f69 is described below
commit 2643f6952c6258dcd45581aa8a9eadc7eb11ad8f
Author: Aaron Gresch <ag...@yahoo-inc.com>
AuthorDate: Fri Jun 5 13:05:06 2020 -0500
STORM-3648 add meter to track worker heartbeat rate
---
docs/Metrics.md | 5 +++++
storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java | 5 +++++
.../src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java | 7 +++++++
3 files changed, 17 insertions(+)
diff --git a/docs/Metrics.md b/docs/Metrics.md
index 8df1a45..7f6ba47 100644
--- a/docs/Metrics.md
+++ b/docs/Metrics.md
@@ -327,3 +327,8 @@ Please refer to the [JVM documentation](https://docs.oracle.com/javase/8/docs/ap
* `uptimeSecs` reports the number of seconds the worker has been up for
* `newWorkerEvent` is 1 when a worker is first started and 0 all other times. This can be used to tell when a worker has crashed and is restarted.
* `startTimeSecs` is when the worker started in seconds since the epoch
+
+##### doHeartbeat-calls
+
+* `doHeartbeat-calls` is a meter that indicates the rate the worker is performing heartbeats.
+
diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
index b4e4c80..ee8e005 100644
--- a/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
+++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/Worker.java
@@ -12,6 +12,7 @@
package org.apache.storm.daemon.worker;
+import com.codahale.metrics.Meter;
import java.io.File;
import java.io.IOException;
import java.net.UnknownHostException;
@@ -83,6 +84,7 @@ public class Worker implements Shutdownable, DaemonCommon {
private final String workerId;
private final LogConfigManager logConfigManager;
private final StormMetricRegistry metricRegistry;
+ private Meter heatbeatMeter;
private WorkerState workerState;
private AtomicReference<List<IRunningExecutor>> executorsAtom;
@@ -196,6 +198,8 @@ public class Worker implements Shutdownable, DaemonCommon {
throws Exception {
workerState = new WorkerState(conf, context, topologyId, assignmentId, supervisorIfaceSupplier, port, workerId,
topologyConf, stateStorage, stormClusterState, autoCreds, metricRegistry);
+ this.heatbeatMeter = metricRegistry.meter("doHeartbeat-calls", workerState.getWorkerTopologyContext(),
+ Constants.SYSTEM_COMPONENT_ID, (int) Constants.SYSTEM_TASK_ID);
// Heartbeat here so that worker process dies if this fails
// it's important that worker heartbeat to supervisor ASAP so that supervisor knows
@@ -362,6 +366,7 @@ public class Worker implements Shutdownable, DaemonCommon {
state.cleanup(60); // this is just in case supervisor is down so that disk doesn't fill up.
// it shouldn't take supervisor 120 seconds between listing dir and reading it
heartbeatToMasterIfLocalbeatFail(lsWorkerHeartbeat);
+ this.heatbeatMeter.mark();
}
public void doExecutorHeartbeats() {
diff --git a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
index 4d62cf4..d095cc0 100644
--- a/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
+++ b/storm-client/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java
@@ -81,6 +81,13 @@ public class StormMetricRegistry {
return meter;
}
+ public Meter meter(String name, WorkerTopologyContext context, String componentId, Integer taskId) {
+ MetricNames metricNames = workerMetricName(name, context.getStormId(), componentId, taskId, context.getThisWorkerPort());
+ Meter meter = registry.meter(metricNames.getLongName());
+ saveMetricTaskIdMapping(taskId, metricNames, meter, taskIdMeters);
+ return meter;
+ }
+
public Meter meter(String name, TopologyContext context) {
MetricNames metricNames = topologyMetricName(name, context);
Meter meter = registry.meter(metricNames.getLongName());