You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/07 16:56:26 UTC

[flink] branch master updated: [FLINK-10243][metrics] Make latency metrics granularity configurable

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 91f8fe8  [FLINK-10243][metrics] Make latency metrics granularity configurable
91f8fe8 is described below

commit 91f8fe831c6af21928fceb3a13d87c9ed5019981
Author: Chesnay Schepler <ch...@apache.org>
AuthorDate: Fri Sep 7 18:56:21 2018 +0200

    [FLINK-10243][metrics] Make latency metrics granularity configurable
---
 docs/_includes/generated/metric_configuration.html |   5 +
 .../apache/flink/configuration/MetricOptions.java  |  12 ++
 .../api/operators/AbstractStreamOperator.java      |  27 ++-
 .../apache/flink/streaming/util/LatencyStats.java  |  74 ++++++-
 .../flink/streaming/util/LatencyStatsTest.java     | 217 +++++++++++++++++++++
 5 files changed, 328 insertions(+), 7 deletions(-)

diff --git a/docs/_includes/generated/metric_configuration.html b/docs/_includes/generated/metric_configuration.html
index 02f4ceb..0c0b0dd 100644
--- a/docs/_includes/generated/metric_configuration.html
+++ b/docs/_includes/generated/metric_configuration.html
@@ -8,6 +8,11 @@
     </thead>
     <tbody>
         <tr>
+            <td><h5>metrics.latency.granularity</h5></td>
+            <td style="word-wrap: break-word;">"operator"</td>
+            <td>Defines the granularity of latency metrics. Accepted values are:<ul><li>single - Track latency without differentiating between sources and subtasks.</li><li>operator - Track latency while differentiating between sources, but not subtasks.</li><li>subtask - Track latency while differentiating between sources and subtasks.</li></ul></td>
+        </tr>
+        <tr>
             <td><h5>metrics.latency.history-size</h5></td>
             <td style="word-wrap: break-word;">128</td>
             <td>Defines the number of measured latencies to maintain at each operator.</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
index 4e9ddeb..f54f5f4 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/MetricOptions.java
@@ -21,6 +21,7 @@ package org.apache.flink.configuration;
 import org.apache.flink.annotation.PublicEvolving;
 
 import static org.apache.flink.configuration.ConfigOptions.key;
+import static org.apache.flink.configuration.description.TextElement.text;
 
 /**
  * Configuration options for metrics and metric reporters.
@@ -111,6 +112,17 @@ public class MetricOptions {
 				" Disables latency tracking if set to 0 or a negative value. Enabling this feature can significantly" +
 				" impact the performance of the cluster.");
 
+	public static final ConfigOption<String> LATENCY_SOURCE_GRANULARITY =
+		key("metrics.latency.granularity")
+			.defaultValue("operator")
+			.withDescription(Description.builder()
+				.text("Defines the granularity of latency metrics. Accepted values are:")
+				.list(
+					text("single - Track latency without differentiating between sources and subtasks."),
+					text("operator - Track latency while differentiating between sources, but not subtasks."),
+					text("subtask - Track latency while differentiating between sources and subtasks."))
+				.build());
+
 	/** The number of measured latencies to maintain at each operator. */
 	public static final ConfigOption<Integer> LATENCY_HISTORY_SIZE =
 		key("metrics.latency.history-size")
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index f52168b..f3c2208 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -72,6 +72,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.Serializable;
+import java.util.Locale;
 
 /**
  * Base class for all stream operators. Operators that contain a user function should extend the class
@@ -193,11 +194,33 @@ public abstract class AbstractStreamOperator<OUT>
 				LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize);
 				historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue();
 			}
+
+			final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY);
+			LatencyStats.Granularity granularity;
+			try {
+				granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT));
+			} catch (IllegalArgumentException iae) {
+				granularity = LatencyStats.Granularity.OPERATOR;
+				LOG.warn(
+					"Configured value {} option for {} is invalid. Defaulting to {}.",
+					configuredGranularity,
+					MetricOptions.LATENCY_SOURCE_GRANULARITY.key(),
+					granularity);
+			}
 			TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent();
-			this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"), historySize, container.getIndexInSubtaskGroup(), getOperatorID());
+			this.latencyStats = new LatencyStats(jobMetricGroup.addGroup("latency"),
+				historySize,
+				container.getIndexInSubtaskGroup(),
+				getOperatorID(),
+				granularity);
 		} catch (Exception e) {
 			LOG.warn("An error occurred while instantiating latency metrics.", e);
-			this.latencyStats = new LatencyStats(UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), 1, 0, new OperatorID());
+			this.latencyStats = new LatencyStats(
+				UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"),
+				1,
+				0,
+				new OperatorID(),
+				LatencyStats.Granularity.SINGLE);
 		}
 
 		this.runtimeContext = new StreamingRuntimeContext(this, environment, container.getAccumulatorMap());
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
index 4f3d33e..926753d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/LatencyStats.java
@@ -34,23 +34,29 @@ public class LatencyStats {
 	private final int historySize;
 	private final int subtaskIndex;
 	private final OperatorID operatorId;
+	private final Granularity granularity;
 
-	public LatencyStats(MetricGroup metricGroup, int historySize, int subtaskIndex, OperatorID operatorID) {
+	public LatencyStats(
+			MetricGroup metricGroup,
+			int historySize,
+			int subtaskIndex,
+			OperatorID operatorID,
+			Granularity granularity) {
 		this.metricGroup = metricGroup;
 		this.historySize = historySize;
 		this.subtaskIndex = subtaskIndex;
 		this.operatorId = operatorID;
+		this.granularity = granularity;
 	}
 
 	public void reportLatency(LatencyMarker marker) {
-		String uniqueName =  "" + marker.getOperatorId() + marker.getSubtaskIndex() + operatorId + subtaskIndex;
+		final String uniqueName = granularity.createUniqueHistogramName(marker, operatorId, subtaskIndex);
+
 		DescriptiveStatisticsHistogram latencyHistogram = this.latencyStats.get(uniqueName);
 		if (latencyHistogram == null) {
 			latencyHistogram = new DescriptiveStatisticsHistogram(this.historySize);
 			this.latencyStats.put(uniqueName, latencyHistogram);
-			this.metricGroup
-				.addGroup("source_id", String.valueOf(marker.getOperatorId()))
-				.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()))
+			granularity.createSourceMetricGroups(metricGroup, marker, operatorId, subtaskIndex)
 				.addGroup("operator_id", String.valueOf(operatorId))
 				.addGroup("operator_subtask_index", String.valueOf(subtaskIndex))
 				.histogram("latency", latencyHistogram);
@@ -59,4 +65,62 @@ public class LatencyStats {
 		long now = System.currentTimeMillis();
 		latencyHistogram.update(now - marker.getMarkedTime());
 	}
+
+	/**
+	 * Granularity for latency metrics.
+	 */
+	public enum Granularity {
+		SINGLE {
+			@Override
+			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+				return String.valueOf(operatorId) + operatorSubtaskIndex;
+			}
+
+			@Override
+			MetricGroup createSourceMetricGroups(
+					MetricGroup base,
+					LatencyMarker marker,
+					OperatorID operatorId,
+					int operatorSubtaskIndex) {
+				return base;
+			}
+		},
+		OPERATOR {
+			@Override
+			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+				return String.valueOf(marker.getOperatorId()) + operatorId + operatorSubtaskIndex;
+			}
+
+			@Override
+			MetricGroup createSourceMetricGroups(
+					MetricGroup base,
+					LatencyMarker marker,
+					OperatorID operatorId,
+					int operatorSubtaskIndex) {
+				return base
+					.addGroup("source_id", String.valueOf(marker.getOperatorId()));
+			}
+		},
+		SUBTASK {
+			@Override
+			String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex) {
+				return String.valueOf(marker.getOperatorId()) + marker.getSubtaskIndex() + operatorId + operatorSubtaskIndex;
+			}
+
+			@Override
+			MetricGroup createSourceMetricGroups(
+					MetricGroup base,
+					LatencyMarker marker,
+					OperatorID operatorId,
+					int operatorSubtaskIndex) {
+				return base
+					.addGroup("source_id", String.valueOf(marker.getOperatorId()))
+					.addGroup("source_subtask_index", String.valueOf(marker.getSubtaskIndex()));
+			}
+		};
+
+		abstract String createUniqueHistogramName(LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+
+		abstract MetricGroup createSourceMetricGroups(MetricGroup base, LatencyMarker marker, OperatorID operatorId, int operatorSubtaskIndex);
+	}
 }
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
new file mode 100644
index 0000000..ef14dcb
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/LatencyStatsTest.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Histogram;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
+/**
+ * Tests for the {@link LatencyStats}.
+ */
+public class LatencyStatsTest extends TestLogger {
+
+	private static final OperatorID OPERATOR_ID = new OperatorID();
+	private static final OperatorID SOURCE_ID_1 = new OperatorID();
+	private static final OperatorID SOURCE_ID_2 = new OperatorID();
+
+	private static final int OPERATOR_SUBTASK_INDEX = 64;
+
+	private static final String PARENT_GROUP_NAME = "parent";
+
+	@Test
+	public void testLatencyStatsSingle() {
+		testLatencyStats(LatencyStats.Granularity.SINGLE, registrations -> {
+			Assert.assertEquals(1, registrations.size());
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(0);
+				assertName(registration.f0);
+				Assert.assertEquals(5, registration.f1.getCount());
+			}
+		});
+	}
+
+	@Test
+	public void testLatencyStatsOperator() {
+		testLatencyStats(LatencyStats.Granularity.OPERATOR, registrations -> {
+			Assert.assertEquals(2, registrations.size());
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(0);
+				assertName(registration.f0, SOURCE_ID_1);
+				Assert.assertEquals(3, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(1);
+				assertName(registration.f0, SOURCE_ID_2);
+				Assert.assertEquals(2, registration.f1.getCount());
+			}
+		});
+	}
+
+	@Test
+	public void testLatencyStatsSubtask() {
+		testLatencyStats(LatencyStats.Granularity.SUBTASK, registrations -> {
+			Assert.assertEquals(4, registrations.size());
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(0);
+				assertName(registration.f0, SOURCE_ID_1, 0);
+				Assert.assertEquals(2, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(1);
+				assertName(registration.f0, SOURCE_ID_1, 1);
+				Assert.assertEquals(1, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(2);
+				assertName(registration.f0, SOURCE_ID_2, 2);
+				Assert.assertEquals(1, registration.f1.getCount());
+			}
+
+			{
+				final Tuple2<String, Histogram> registration = registrations.get(3);
+				assertName(registration.f0, SOURCE_ID_2, 3);
+				Assert.assertEquals(1, registration.f1.getCount());
+			}
+		});
+	}
+
+	private static void testLatencyStats(
+		final LatencyStats.Granularity granularity,
+		final Consumer<List<Tuple2<String, Histogram>>> verifier) {
+
+		final AbstractMetricGroup<?> dummyGroup = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
+		final TestMetricRegistry registry = new TestMetricRegistry();
+		final MetricGroup parentGroup = new GenericMetricGroup(registry, dummyGroup, PARENT_GROUP_NAME);
+
+		final LatencyStats latencyStats = new LatencyStats(
+			parentGroup,
+			MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(),
+			OPERATOR_SUBTASK_INDEX,
+			OPERATOR_ID,
+			granularity);
+
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 0));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_1, 1));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 2));
+		latencyStats.reportLatency(new LatencyMarker(0L, SOURCE_ID_2, 3));
+
+		verifier.accept(registry.latencyHistograms);
+	}
+
+	/**
+	 * Removes all parts from the metric identifier preceding the latency-related parts.
+	 */
+	private static String sanitizeName(final String registrationName) {
+		return registrationName.substring(registrationName.lastIndexOf(PARENT_GROUP_NAME) + PARENT_GROUP_NAME.length() + 1);
+	}
+
+	private static void assertName(final String registrationName) {
+		final String sanitizedName = sanitizeName(registrationName);
+		Assert.assertEquals("operator_id." + OPERATOR_ID +
+			".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+			".latency", sanitizedName);
+	}
+
+	private static void assertName(final String registrationName, final OperatorID sourceId) {
+		final String sanitizedName = sanitizeName(registrationName);
+		Assert.assertEquals("source_id." + sourceId +
+			".operator_id." + OPERATOR_ID +
+			".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+			".latency", sanitizedName);
+	}
+
+	private static void assertName(final String registrationName, final OperatorID sourceId, final int sourceIndex) {
+		final String sanitizedName = sanitizeName(registrationName);
+		Assert.assertEquals("source_id." + sourceId +
+			".source_subtask_index." + sourceIndex +
+			".operator_id." + OPERATOR_ID +
+			".operator_subtask_index." + OPERATOR_SUBTASK_INDEX +
+			".latency", sanitizedName);
+	}
+
+	private static class TestMetricRegistry implements MetricRegistry {
+
+		private final List<Tuple2<String, Histogram>> latencyHistograms = new ArrayList<>(4);
+
+		@Override
+		public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+			if (metric instanceof Histogram) {
+				latencyHistograms.add(Tuple2.of(group.getMetricIdentifier(metricName), (Histogram) metric));
+			}
+		}
+
+		@Override
+		public char getDelimiter() {
+			return '.';
+		}
+
+		@Override
+		public char getDelimiter(int index) {
+			return 0;
+		}
+
+		@Override
+		public int getNumberReporters() {
+			return 0;
+		}
+
+		@Override
+		public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+
+		}
+
+		@Override
+		public ScopeFormats getScopeFormats() {
+			return null;
+		}
+
+		@Nullable
+		@Override
+		public String getMetricQueryServicePath() {
+			return null;
+		}
+	}
+}