You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/07/30 20:22:10 UTC

[storm] branch master updated: STORM-3786 ensure v2 metrics tick reporting occurs at specified interval (#3406)

This is an automated email from the ASF dual-hosted git repository.

agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git


The following commit(s) were added to refs/heads/master by this push:
     new b869958  STORM-3786 ensure v2 metrics tick reporting occurs at specified interval (#3406)
b869958 is described below

commit b869958557675c1da29e21db494d9daf62df30e4
Author: agresch <ag...@gmail.com>
AuthorDate: Fri Jul 30 15:21:57 2021 -0500

    STORM-3786 ensure v2 metrics tick reporting occurs at specified interval (#3406)
    
    * STORM-3786 ensure v2 metrics tick reporting occurs at specified interval
---
 storm-client/src/jvm/org/apache/storm/Config.java  |  7 +++
 .../jvm/org/apache/storm/executor/Executor.java    | 61 +++++++++++++++-------
 2 files changed, 49 insertions(+), 19 deletions(-)

diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 8916de4..9de5dd8 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -278,6 +278,13 @@ public class Config extends HashMap<String, Object> {
     public static final String TOPOLOGY_ENABLE_V2_METRICS_TICK = "topology.enable.v2.metrics.tick";
 
     /**
+     * Topology configuration to specify the V2 metrics tick interval in seconds.
+     */
+    @IsInteger
+    @IsPositiveNumber
+    public static final String TOPOLOGY_V2_METRICS_TICK_INTERVAL_SECONDS = "topology.v2.metrics.tick.interval.seconds";
+
+    /**
      * The class name of the {@link org.apache.storm.state.StateProvider} implementation. If not specified defaults to {@link
      * org.apache.storm.state.InMemoryKeyValueStateProvider}. This can be overridden at the component level.
      */
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 8b4d8ac..698f7dc 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -128,6 +128,8 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
     private AtomicBoolean needToRefreshCreds = new AtomicBoolean(false);
     private final RateCounter reportedErrorCount;
+    private final boolean enableV2MetricsDataPoints;
+    private final Integer v2MetricsTickInterval;
 
     protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
         this.workerData = workerData;
@@ -183,6 +185,9 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
         flushTuple = AddressedTuple.createFlushTuple(workerTopologyContext);
         this.reportedErrorCount = workerData.getMetricRegistry().rateCounter("__reported-error-count", componentId,
                 taskIds.get(0));
+
+        enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
+        v2MetricsTickInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_V2_METRICS_TICK_INTERVAL_SECONDS), 60);
     }
 
     public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
@@ -337,7 +342,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
                     }
                 }
             }
-            addV2Metrics(taskId, dataPoints);
+            addV2Metrics(taskId, dataPoints, interval);
 
             if (!dataPoints.isEmpty()) {
                 IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
@@ -353,11 +358,16 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     }
 
     // updates v1 metric dataPoints with v2 metric API data
-    private void addV2Metrics(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
-        boolean enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
+    private void addV2Metrics(int taskId, List<IMetricsConsumer.DataPoint> dataPoints, int interval) {
         if (!enableV2MetricsDataPoints) {
             return;
         }
+
+        // only report v2 metric on the proper metrics tick interval
+        if (interval != v2MetricsTickInterval) {
+            return;
+        }
+
         processGauges(taskId, dataPoints);
         processCounters(taskId, dataPoints);
         processHistograms(taskId, dataPoints);
@@ -444,25 +454,38 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
     }
 
     protected void setupMetrics() {
+        boolean v2TickScheduled = !enableV2MetricsDataPoints;
         for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
-            StormTimer timerTask = workerData.getUserTimer();
-            timerTask.scheduleRecurring(interval, interval,
-                () -> {
-                    TupleImpl tuple =
+            scheduleMetricsTick(interval);
+            if (interval == v2MetricsTickInterval) {
+                v2TickScheduled = true;
+            }
+        }
+
+        if (!v2TickScheduled) {
+            LOG.info("Scheduling v2 metrics tick for interval {}", v2MetricsTickInterval);
+            scheduleMetricsTick(v2MetricsTickInterval);
+        }
+    }
+
+    private void scheduleMetricsTick(int interval) {
+        StormTimer timerTask = workerData.getUserTimer();
+        timerTask.scheduleRecurring(interval, interval,
+            () -> {
+                TupleImpl tuple =
                         new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
-                                      (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
-                    AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
-                    try {
-                        receiveQueue.publish(metricsTickTuple);
-                        receiveQueue.flush();  // avoid buffering
-                    } catch (InterruptedException e) {
-                        LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
-                        Thread.currentThread().interrupt();
-                        return;
-                    }
+                                (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
+                AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+                try {
+                    receiveQueue.publish(metricsTickTuple);
+                    receiveQueue.flush();  // avoid buffering
+                } catch (InterruptedException e) {
+                    LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
+                    Thread.currentThread().interrupt();
+                    return;
                 }
-            );
-        }
+            }
+        );
     }
 
     protected void setupTicks(boolean isSpout) {