You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/07 16:56:26 UTC
[flink] branch master updated: [FLINK-10243][metrics] Make latency
metrics granularity configurable
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 91f8fe8 [FLINK-10243][metrics] Make latency metrics granularity configurable
91f8fe8 is described below
commit 91f8fe831c6af21928fceb3a13d87c9ed5019981
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Sep 7 18:56:21 2018 +0200
[FLINK-10243][metrics] Make latency metrics granularity configurable
---
docs/_includes/generated/metric_configuration.html | 5 +
.../apache/flink/configuration/MetricOptions.java | 12 ++
.../api/operators/AbstractStreamOperator.java | 27 ++-
.../apache/flink/streaming/util/LatencyStats.java | 74 ++++++-
.../flink/streaming/util/LatencyStatsTest.java | 217 +++++++++++++++++++++
5 files changed, 328 insertions(+), 7 deletions(-)
diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html
index 02f4ceb..0c0b0dd 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -8,6 +8,11 @@
</thead>
<tbody>
<tr>
+ <td><h5>metrics.latency.granularity</h5></td>
+ <td style="word-wrap: break-word;">"operator"</td>
+ <td>Defines the granularity of latency metrics. Accepted values are:<ul><li>single - Track latency without differentiating between sources and subtasks.</li><li>operator - Track latency while differentiating between sources, but not subtasks.</li><li>subtask - Track latency while differentiating between sources and subtasks.</li></ul></td>
+ </tr>
+ <tr>
<td><h5>metrics.latency.history-size</h5></td>
<td style="word-wrap: break-word;">128</td>
<td>Defines the number of measured latencies to maintain at each operator.</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 4e9ddeb..f54f5f4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.configuration;
import org.apache.flink.annotation.PublicEvolving;
import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
/**
* Configuration options for metrics and metric reporters.
@@ -111,6 +112,17 @@ public class MetricOptions {
" Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" +
" impact the performance of the cluster.");
+ public static final ConfigOption<String> LATENCY_SOURCE_GRANULARITY =
+ key("metrics.latency.granularity")
+ .defaultValue("operator")
+ .withDescription(Description.builder()
+ .text("Defines the granularity of latency metrics. Accepted values are:")
+ .list(
+ text("single - Track latency without differentiating between sources and subtasks."),
+ text("operator - Track latency while differentiating between sources, but not subtasks."),
+ text("subtask - Track latency while differentiating between sources and subtasks."))
+ .build());
+
/** The number of measured latencies to maintain at each operator. */
public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
key("metrics.latency.history-size")
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f52168b..f3c2208 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,6 +72,7 @@ import org.slf4j.LoggerFactory;
import java.io.Closeable;
import java.io.Serializable;
+import java.util.Locale;
/**
* Base class for all stream operators. Operators that contain a user function should extend the class
@@ -193,11 +194,33 @@ public abstract class AbstractStreamOperator<OUT>
LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
}
+
+ final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
+ LatencyStats.Granularity granularity;
+ try {
+ granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException iae) {
+ granularity = LatencyStats.Granularity.OPERATOR;
+ LOG.warn(
+ "Configured value {} option for {} is invalid. Defaulting to {}.",
+ configuredGranularity,
+ MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
+ granularity);
+ }
TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
- this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), historySize, container.getIndexInSubtaskGroup(), getOperatorID());
+ this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"),
+ historySize,
+ container.getIndexInSubtaskGroup(),
+ getOperatorID(),
+ granularity);
} catch (Exception e) {
LOG.warn("An error occurred while instantiating latency metrics.", e);
- this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID());
+ this.latencyStats = new LatencyStats(
+ UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
+ 1,
+ 0,
+ new OperatorID(),
+ LatencyStats.Granularity.SINGLE);
}
this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
index 4f3d33e..926753d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
@@ -34,23 +34,29 @@ public class LatencyStats {
private final int historySize;
private final int subtaskIndex;
private final OperatorID operatorId;
+ private final Granularity granularity;
- public LatencyStats(MetricGroup metricGroup, int historySize, int subtaskIndex, OperatorID operatorID) {
+ public LatencyStats(
+ MetricGroup metricGroup,
+ int historySize,
+ int subtaskIndex,
+ OperatorID operatorID,
+ Granularity granularity) {
this.metricGroup = metricGroup;
this.historySize = historySize;
this.subtaskIndex = subtaskIndex;
this.operatorId = operatorID;
+ this.granularity = granularity;
}
public void reportLatency(LatencyMarker marker) {
- String uniqueName = "" + marker.getOperatorId() + marker.getSubtaskIndex() + operatorId + subtaskIndex;
+ final String uniqueName = granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);
+
DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName);
if (latencyHistogram == null) {
latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
this.latencyStats.put(uniqueName, latencyHistogram);
- this.metricGroup
- .addGroup("source_id", String.valueOf(marker.getOperatorId()))
- .addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()))
+ granularity.createSourceMetricGroups(metricGroup, marker, operatorId, subtaskIndex)
.addGroup("operator_id", String.valueOf(operatorId))
.addGroup("operator_subtask_index", String.valueOf(subtaskIndex))
.histogram("latency", latencyHistogram);
@@ -59,4 +65,62 @@ public class LatencyStats {
long now = System.currentTimeMillis();
latencyHistogram.update(now - marker.getMarkedTime());
}
+
+ /**
+ * Granularity for latency metrics.
+ */
+ public enum Granularity {
+ SINGLE {
+ @Override
+ String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+ return String.valueOf(operatorId) + operatorSubtaskIndex;
+ }
+
+ @Override
+ MetricGroup createSourceMetricGroups(
+ MetricGroup base,
+ LatencyMarker marker,
+ OperatorID operatorId,
+ int operatorSubtaskIndex) {
+ return base;
+ }
+ },
+ OPERATOR {
+ @Override
+ String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+ return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;
+ }
+
+ @Override
+ MetricGroup createSourceMetricGroups(
+ MetricGroup base,
+ LatencyMarker marker,
+ OperatorID operatorId,
+ int operatorSubtaskIndex) {
+ return base
+ .addGroup("source_id", String.valueOf(marker.getOperatorId()));
+ }
+ },
+ SUBTASK {
+ @Override
+ String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+ return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
+ }
+
+ @Override
+ MetricGroup createSourceMetricGroups(
+ MetricGroup base,
+ LatencyMarker marker,
+ OperatorID operatorId,
+ int operatorSubtaskIndex) {
+ return base
+ .addGroup("source_id", String.valueOf(marker.getOperatorId()))
+ .addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
+ }
+ };
+
+ abstract String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+
+ abstract MetricGroup createSourceMetricGroups(MetricGroup base, LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+ }
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
new file mode 100644
index 0000000..ef14dcb
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Tests for the {@link LatencyStats}.
+ */
+public class LatencyStatsTest extends TestLogger {
+
+ private static final OperatorID OPERATOR_ID = new OperatorID();
+ private static final OperatorID SOURCE_ID_1 = new OperatorID();
+ private static final OperatorID SOURCE_ID_2 = new OperatorID();
+
+ private static final int OPERATOR_SUBTASK_INDEX = 64;
+
+ private static final String PARENT_GROUP_NAME = "parent";
+
+ @Test
+ public void testLatencyStatsSingle() {
+ testLatencyStats(LatencyStats.Granularity.SINGLE, registrations -> {
+ Assert.assertEquals(1, registrations.size());
+
+ {
+ final Tuple2<String, Histogram> registration = registrations.get(0);
+ assertName(registration.f0);
+ Assert.assertEquals(5, registration.f1.getCount());
+ }
+ });
+ }
+
+ @Test
+ public void testLatencyStatsOperator() {
+ testLatencyStats(LatencyStats.Granularity.OPERATOR, registrations -> {
+ Assert.assertEquals(2, registrations.size());
+
+ {
+ final Tuple2<String, Histogram> registration = registrations.get(0);
+ assertName(registration.f0, SOURCE_ID_1);
+ Assert.assertEquals(3, registration.f1.getCount());
+ }
+
+ {
+ final Tuple2<String, Histogram> registration = registrations.get(1);
+ assertName(registration.f0, SOURCE_ID_2);
+ Assert.assertEquals(2, registration.f1.getCount());
+ }
+ });
+ }
+
+ @Test
+ public void testLatencyStatsSubtask() {
+ testLatencyStats(LatencyStats.Granularity.SUBTASK, registrations -> {
+ Assert.assertEquals(4, registrations.size());
+
+ {
+ final Tuple2<String, Histogram> registration = registrations.get(0);
+ assertName(registration.f0, SOURCE_ID_1, 0);
+ Assert.assertEquals(2, registration.f1.getCount());
+ }
+
+ {
+ final Tuple2<String, Histogram> registration = registrations.get(1);
+ assertName(registration.f0, SOURCE_ID_1, 1);
+ Assert.assertEquals(1, registration.f1.getCount());
+ }
+
+ {
+ final Tuple2<String, Histogram> registration = registrations.get(2);
+ assertName(registration.f0, SOURCE_ID_2, 2);
+ Assert.assertEquals(1, registration.f1.getCount());
+ }
+
+ {
+ final Tuple2<String, Histogram> registration = registrations.get(3);
+ assertName(registration.f0, SOURCE_ID_2, 3);
+ Assert.assertEquals(1, registration.f1.getCount());
+ }
+ });
+ }
+
+ private static void testLatencyStats(
+ final LatencyStats.Granularity granularity,
+ final Consumer<List<Tuple2<String, Histogram>>> verifier) {
+
+ final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+ final TestMetricRegistry registry = new TestMetricRegistry();
+ final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);
+
+ final LatencyStats latencyStats = new LatencyStats(
+ parentGroup,
+ MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
+ OPERATOR_SUBTASK_INDEX,
+ OPERATOR_ID,
+ granularity);
+
+ latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
+ latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
+ latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
+ latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
+ latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));
+
+ verifier.accept(registry.latencyHistograms);
+ }
+
+ /**
+ * Removes all parts from the metric identifier preceding the latency-related parts.
+ */
+ private static String sanitizeName(final String registrationName) {
+ return registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + PARENT_GROUP_NAME.length() + 1);
+ }
+
+ private static void assertName(final String registrationName) {
+ final String sanitizedName = sanitizeName(registrationName);
+ Assert.assertEquals("operator_id." + OPERATOR_ID +
+ ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+ ".latency", sanitizedName);
+ }
+
+ private static void assertName(final String registrationName, final OperatorID sourceId) {
+ final String sanitizedName = sanitizeName(registrationName);
+ Assert.assertEquals("source_id." + sourceId +
+ ".operator_id." + OPERATOR_ID +
+ ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+ ".latency", sanitizedName);
+ }
+
+ private static void assertName(final String registrationName, final OperatorID sourceId, final int sourceIndex) {
+ final String sanitizedName = sanitizeName(registrationName);
+ Assert.assertEquals("source_id." + sourceId +
+ ".source_subtask_index." + sourceIndex +
+ ".operator_id." + OPERATOR_ID +
+ ".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+ ".latency", sanitizedName);
+ }
+
+ private static class TestMetricRegistry implements MetricRegistry {
+
+ private final List<Tuple2<String, Histogram>> latencyHistograms = new ArrayList<>(4);
+
+ @Override
+ public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+ if (metric instanceof Histogram) {
+ latencyHistograms.add(Tuple2.of(group.getMetricIdentifier(metricName), (Histogram) metric));
+ }
+ }
+
+ @Override
+ public char getDelimiter() {
+ return '.';
+ }
+
+ @Override
+ public char getDelimiter(int index) {
+ return 0;
+ }
+
+ @Override
+ public int getNumberReporters() {
+ return 0;
+ }
+
+ @Override
+ public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+
+ }
+
+ @Override
+ public ScopeFormats getScopeFormats() {
+ return null;
+ }
+
+ @Nullable
+ @Override
+ public String getMetricQueryServicePath() {
+ return null;
+ }
+ }
+}