You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/07/11 10:05:25 UTC
[3/9] flink git commit: [FLINK-9789][metrics] Ensure uniqueness of
watermark metrics
[FLINK-9789][metrics] Ensure uniqueness of watermark metrics
This closes #6292.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/60df251a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/60df251a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/60df251a
Branch: refs/heads/master
Commit: 60df251ad34ac033ed6c4423a69765739cb04199
Parents: 2fbe562
Author: zentol <ch...@apache.org>
Authored: Tue Jul 10 13:27:34 2018 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 11 12:05:07 2018 +0200
----------------------------------------------------------------------
.../util/InterceptingTaskMetricGroup.java | 53 ++++++++++++++++++++
.../runtime/tasks/OneInputStreamTask.java | 3 +-
.../streaming/runtime/tasks/OperatorChain.java | 5 +-
.../runtime/tasks/TwoInputStreamTask.java | 3 +-
.../runtime/tasks/OneInputStreamTaskTest.java | 18 ++++++-
.../runtime/tasks/TwoInputStreamTaskTest.java | 22 +++++++-
6 files changed, 98 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
new file mode 100644
index 0000000..29454b4
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/util/InterceptingTaskMetricGroup.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.util;
+
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * A {@link TaskMetricGroup} that exposes all registered metrics.
+ */
+public class InterceptingTaskMetricGroup extends UnregisteredMetricGroups.UnregisteredTaskMetricGroup {
+
+ private Map<String, Metric> intercepted;
+
+ /**
+ * Returns the registered metric for the given name, or null if it was never registered.
+ *
+ * @param name metric name
+ * @return registered metric for the given name, or null if it was never registered
+ */
+ public Metric get(String name) {
+ return intercepted.get(name);
+ }
+
+ @Override
+ protected void addMetric(String name, Metric metric) {
+ if (intercepted == null) {
+ intercepted = new HashMap<>();
+ }
+ intercepted.put(name, metric);
+ super.addMetric(name, metric);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index 43eab24..7498518 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -93,7 +93,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
inputWatermarkGauge);
}
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
- getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);
+ // wrap watermark gauge since registered metrics must be unique
+ getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index c105ad7..015b7db 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -381,8 +381,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
currentOperatorOutput = new CopyingChainingOutput<>(chainedOperator, inSerializer, outputTag, this);
}
- chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge());
- chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge());
+ // wrap watermark gauges since registered metrics must be unique
+ chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, currentOperatorOutput.getWatermarkGauge()::getValue);
+ chainedOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_OUTPUT_WATERMARK, chainedOperatorOutput.getWatermarkGauge()::getValue);
return currentOperatorOutput;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 93a5675..546ccdb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -105,7 +105,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_1_WATERMARK, input1WatermarkGauge);
headOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_2_WATERMARK, input2WatermarkGauge);
- getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge);
+ // wrap watermark gauge since registered metrics must be unique
+ getEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, minInputWatermarkGauge::getValue);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index af776d5..201e138 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -64,6 +65,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -678,7 +680,7 @@ public class OneInputStreamTaskTest extends TestLogger {
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
- TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+ InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup addOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
@@ -702,11 +704,23 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.invoke(env);
testHarness.waitForTaskRunning();
+ Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headOutputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
+ Assert.assertEquals("A metric was registered multiple times.",
+ 5,
+ new HashSet<>(Arrays.asList(
+ taskInputWatermarkGauge,
+ headInputWatermarkGauge,
+ headOutputWatermarkGauge,
+ chainedInputWatermarkGauge,
+ chainedOutputWatermarkGauge))
+ .size());
+
+ Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, chainedInputWatermarkGauge.getValue().longValue());
@@ -714,6 +728,7 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.processElement(new Watermark(1L));
testHarness.waitForInputProcessing();
+ Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, chainedInputWatermarkGauge.getValue().longValue());
@@ -721,6 +736,7 @@ public class OneInputStreamTaskTest extends TestLogger {
testHarness.processElement(new Watermark(2L));
testHarness.waitForInputProcessing();
+ Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, headOutputWatermarkGauge.getValue().longValue());
Assert.assertEquals(4L, chainedInputWatermarkGauge.getValue().longValue());
http://git-wip-us.apache.org/repos/asf/flink/blob/60df251a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 38b262c..5d15157 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.metrics.util.InterceptingOperatorMetricGroup;
+import org.apache.flink.runtime.metrics.util.InterceptingTaskMetricGroup;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -49,6 +50,8 @@ import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
@@ -465,7 +468,7 @@ public class TwoInputStreamTaskTest {
InterceptingOperatorMetricGroup headOperatorMetricGroup = new InterceptingOperatorMetricGroup();
InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
- TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
+ InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
@Override
public OperatorMetricGroup addOperator(OperatorID id, String name) {
if (id.equals(headOperatorId)) {
@@ -489,6 +492,7 @@ public class TwoInputStreamTaskTest {
testHarness.invoke(env);
testHarness.waitForTaskRunning();
+ Gauge<Long> taskInputWatermarkGauge = (Gauge<Long>) taskMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> headInput1WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_1_WATERMARK);
Gauge<Long> headInput2WatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_2_WATERMARK);
Gauge<Long> headInputWatermarkGauge = (Gauge<Long>) headOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
@@ -496,6 +500,19 @@ public class TwoInputStreamTaskTest {
Gauge<Long> chainedInputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_INPUT_WATERMARK);
Gauge<Long> chainedOutputWatermarkGauge = (Gauge<Long>) chainedOperatorMetricGroup.get(MetricNames.IO_CURRENT_OUTPUT_WATERMARK);
+ Assert.assertEquals("A metric was registered multiple times.",
+ 7,
+ new HashSet<>(Arrays.asList(
+ taskInputWatermarkGauge,
+ headInput1WatermarkGauge,
+ headInput2WatermarkGauge,
+ headInputWatermarkGauge,
+ headOutputWatermarkGauge,
+ chainedInputWatermarkGauge,
+ chainedOutputWatermarkGauge))
+ .size());
+
+ Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
@@ -505,6 +522,7 @@ public class TwoInputStreamTaskTest {
testHarness.processElement(new Watermark(1L), 0, 0);
testHarness.waitForInputProcessing();
+ Assert.assertEquals(Long.MIN_VALUE, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(Long.MIN_VALUE, headInput2WatermarkGauge.getValue().longValue());
@@ -514,6 +532,7 @@ public class TwoInputStreamTaskTest {
testHarness.processElement(new Watermark(2L), 1, 0);
testHarness.waitForInputProcessing();
+ Assert.assertEquals(1L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(1L, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());
@@ -523,6 +542,7 @@ public class TwoInputStreamTaskTest {
testHarness.processElement(new Watermark(3L), 0, 0);
testHarness.waitForInputProcessing();
+ Assert.assertEquals(2L, taskInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInputWatermarkGauge.getValue().longValue());
Assert.assertEquals(3L, headInput1WatermarkGauge.getValue().longValue());
Assert.assertEquals(2L, headInput2WatermarkGauge.getValue().longValue());