You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ag...@apache.org on 2021/07/30 20:22:10 UTC
[storm] branch master updated: STORM-3786 ensure v2 metrics tick
reporting occurs at specified interval (#3406)
This is an automated email from the ASF dual-hosted git repository.
agresch pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push:
new b869958 STORM-3786 ensure v2 metrics tick reporting occurs at specified interval (#3406)
b869958 is described below
commit b869958557675c1da29e21db494d9daf62df30e4
Author: agresch <ag...@gmail.com>
AuthorDate: Fri Jul 30 15:21:57 2021 -0500
STORM-3786 ensure v2 metrics tick reporting occurs at specified interval (#3406)
* STORM-3786 ensure v2 metrics tick reporting occurs at specified interval
---
storm-client/src/jvm/org/apache/storm/Config.java | 7 +++
.../jvm/org/apache/storm/executor/Executor.java | 61 +++++++++++++++-------
2 files changed, 49 insertions(+), 19 deletions(-)
diff --git a/storm-client/src/jvm/org/apache/storm/Config.java b/storm-client/src/jvm/org/apache/storm/Config.java
index 8916de4..9de5dd8 100644
--- a/storm-client/src/jvm/org/apache/storm/Config.java
+++ b/storm-client/src/jvm/org/apache/storm/Config.java
@@ -278,6 +278,13 @@ public class Config extends HashMap<String, Object> {
public static final String TOPOLOGY_ENABLE_V2_METRICS_TICK = "topology.enable.v2.metrics.tick";
/**
+ * Topology configuration to specify the V2 metrics tick interval in seconds.
+ */
+ @IsInteger
+ @IsPositiveNumber
+ public static final String TOPOLOGY_V2_METRICS_TICK_INTERVAL_SECONDS = "topology.v2.metrics.tick.interval.seconds";
+
+ /**
* The class name of the {@link org.apache.storm.state.StateProvider} implementation. If not specified defaults to {@link
* org.apache.storm.state.InMemoryKeyValueStateProvider}. This can be overridden at the component level.
*/
diff --git a/storm-client/src/jvm/org/apache/storm/executor/Executor.java b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
index 8b4d8ac..698f7dc 100644
--- a/storm-client/src/jvm/org/apache/storm/executor/Executor.java
+++ b/storm-client/src/jvm/org/apache/storm/executor/Executor.java
@@ -128,6 +128,8 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
private static final double msDurationFactor = 1.0 / TimeUnit.MILLISECONDS.toNanos(1);
private AtomicBoolean needToRefreshCreds = new AtomicBoolean(false);
private final RateCounter reportedErrorCount;
+ private final boolean enableV2MetricsDataPoints;
+ private final Integer v2MetricsTickInterval;
protected Executor(WorkerState workerData, List<Long> executorId, Map<String, String> credentials, String type) {
this.workerData = workerData;
@@ -183,6 +185,9 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
flushTuple = AddressedTuple.createFlushTuple(workerTopologyContext);
this.reportedErrorCount = workerData.getMetricRegistry().rateCounter("__reported-error-count", componentId,
taskIds.get(0));
+
+ enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
+ v2MetricsTickInterval = ObjectReader.getInt(topoConf.get(Config.TOPOLOGY_V2_METRICS_TICK_INTERVAL_SECONDS), 60);
}
public static Executor mkExecutor(WorkerState workerState, List<Long> executorId, Map<String, String> credentials) {
@@ -337,7 +342,7 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
}
}
}
- addV2Metrics(taskId, dataPoints);
+ addV2Metrics(taskId, dataPoints, interval);
if (!dataPoints.isEmpty()) {
IMetricsConsumer.TaskInfo taskInfo = new IMetricsConsumer.TaskInfo(
@@ -353,11 +358,16 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
}
// updates v1 metric dataPoints with v2 metric API data
- private void addV2Metrics(int taskId, List<IMetricsConsumer.DataPoint> dataPoints) {
- boolean enableV2MetricsDataPoints = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_ENABLE_V2_METRICS_TICK), false);
+ private void addV2Metrics(int taskId, List<IMetricsConsumer.DataPoint> dataPoints, int interval) {
if (!enableV2MetricsDataPoints) {
return;
}
+
+ // only report v2 metric on the proper metrics tick interval
+ if (interval != v2MetricsTickInterval) {
+ return;
+ }
+
processGauges(taskId, dataPoints);
processCounters(taskId, dataPoints);
processHistograms(taskId, dataPoints);
@@ -444,25 +454,38 @@ public abstract class Executor implements Callable, JCQueue.Consumer {
}
protected void setupMetrics() {
+ boolean v2TickScheduled = !enableV2MetricsDataPoints;
for (final Integer interval : intervalToTaskToMetricToRegistry.keySet()) {
- StormTimer timerTask = workerData.getUserTimer();
- timerTask.scheduleRecurring(interval, interval,
- () -> {
- TupleImpl tuple =
+ scheduleMetricsTick(interval);
+ if (interval == v2MetricsTickInterval) {
+ v2TickScheduled = true;
+ }
+ }
+
+ if (!v2TickScheduled) {
+ LOG.info("Scheduling v2 metrics tick for interval {}", v2MetricsTickInterval);
+ scheduleMetricsTick(v2MetricsTickInterval);
+ }
+ }
+
+ private void scheduleMetricsTick(int interval) {
+ StormTimer timerTask = workerData.getUserTimer();
+ timerTask.scheduleRecurring(interval, interval,
+ () -> {
+ TupleImpl tuple =
new TupleImpl(workerTopologyContext, new Values(interval), Constants.SYSTEM_COMPONENT_ID,
- (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
- AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
- try {
- receiveQueue.publish(metricsTickTuple);
- receiveQueue.flush(); // avoid buffering
- } catch (InterruptedException e) {
- LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
- Thread.currentThread().interrupt();
- return;
- }
+ (int) Constants.SYSTEM_TASK_ID, Constants.METRICS_TICK_STREAM_ID);
+ AddressedTuple metricsTickTuple = new AddressedTuple(AddressedTuple.BROADCAST_DEST, tuple);
+ try {
+ receiveQueue.publish(metricsTickTuple);
+ receiveQueue.flush(); // avoid buffering
+ } catch (InterruptedException e) {
+ LOG.warn("Thread interrupted when publishing metrics. Setting interrupt flag.");
+ Thread.currentThread().interrupt();
+ return;
}
- );
- }
+ }
+ );
}
protected void setupTicks(boolean isSpout) {