You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/10 15:44:21 UTC

flink git commit: [FLINK-9789][metrics] Ensure uniqueness of watermark metrics

Repository: flink
Updated Branches:
  refs/heads/release-1.5 2c47ef357 -> a0a810720


[FLINK-9789][metrics] Ensure uniqueness of watermark metrics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0a81072
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0a81072
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0a81072

Branch: refs/heads/release-1.5
Commit: a0a810720d251fb9ab6a4cf9d995828d99a3f2c8
Parents: 2c47ef3
Author: zentol <ch...@apache.org>
Authored: Tue Jul 10 13:27:34 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Jul 10 16:14:38 2018 +0200

----------------------------------------------------------------------
 .../util/InterceptingTaskMetricGroup.java       | 53 ++++++++++++++++++++
 .../runtime/tasks/OneInputStreamTask.java       |  2 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  4 +-
 .../runtime/tasks/TwoInputStreamTask.java       |  2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java   | 18 ++++++-
 .../runtime/tasks/TwoInputStreamTaskTest.java   | 22 +++++++-
 6 files changed, 95 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a0a81072/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
new file mode 100644
index 0000000..29454b4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+	private Map<String, Metric> intercepted;
+
+	/**
+	 * Returns the registered metric for the given name, or null if it was never registered.
+	 *
+	 * @param name metric name
+	 * @return registered metric for the given name, or null if it was never registered
+	 */
+	public Metric get(String name) {
+		return intercepted.get(name);
+	}
+
+	@Override
+	protected void addMetric(String name, Metric metric) {
+		if (intercepted == null) {
+			intercepted = new HashMap<>();
+		}
+		intercepted.put(name, metric);
+		super.addMetric(name, metric);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0a81072/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 43eab24..dd08774 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -93,7 +93,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 					inputWatermarkGauge);
 		}
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
-		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
+		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a0a81072/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 1965bfb..b5303e7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -362,8 +362,8 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 			currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
 		}
 
-		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge());
-		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge());
+		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
+		chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
 
 		return currentOperatorOutput;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0a81072/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 93a5675..92ada7e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -105,7 +105,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge);
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge);
 		headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
-		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge);
+		getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a0a81072/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index af776d5..201e138 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -64,6 +65,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -678,7 +680,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
-		TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
 			public OperatorMetricGroup addOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
@@ -702,11 +704,23 @@ public class OneInputStreamTaskTest extends TestLogger {
 		testHarness.invoke(env);
 		testHarness.waitForTaskRunning();
 
+		Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 		Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 
+		Assert.assertEquals("A metric was registered multiple times.",
+			5,
+			new HashSet<>(Arrays.asList(
+				taskInputWatermarkGauge,
+				headInputWatermarkGauge,
+				headOutputWatermarkGauge,
+				chainedInputWatermarkGauge,
+				chainedOutputWatermarkGauge))
+				.size());
+
+		Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
@@ -714,6 +728,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		testHarness.processElement(new Watermark(1L));
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
@@ -721,6 +736,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		testHarness.processElement(new Watermark(2L));
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/a0a81072/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 38b262c..5d15157 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -49,6 +50,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -465,7 +468,7 @@ public class TwoInputStreamTaskTest {
 
 		InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
-		TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
 			public OperatorMetricGroup addOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
@@ -489,6 +492,7 @@ public class TwoInputStreamTaskTest {
 		testHarness.invoke(env);
 		testHarness.waitForTaskRunning();
 
+		Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK);
 		Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK);
 		Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
@@ -496,6 +500,19 @@ public class TwoInputStreamTaskTest {
 		Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
 		Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
 
+		Assert.assertEquals("A metric was registered multiple times.",
+			7,
+			new HashSet<>(Arrays.asList(
+				taskInputWatermarkGauge,
+				headInput1WatermarkGauge,
+				headInput2WatermarkGauge,
+				headInputWatermarkGauge,
+				headOutputWatermarkGauge,
+				chainedInputWatermarkGauge,
+				chainedOutputWatermarkGauge))
+				.size());
+
+		Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
@@ -505,6 +522,7 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.processElement(new Watermark(1L), 0, 0);
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
@@ -514,6 +532,7 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.processElement(new Watermark(2L), 1, 0);
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
@@ -523,6 +542,7 @@ public class TwoInputStreamTaskTest {
 
 		testHarness.processElement(new Watermark(3L), 0, 0);
 		testHarness.waitForInputProcessing();
+		Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
 		Assert.assertEquals(3L, headInput1WatermarkGauge.getValue().longValue());
 		Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());