You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2018/09/03 13:03:32 UTC

[GitHub] zentol closed pull request #6599: [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch

zentol closed pull request #6599: [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
URL: https://github.com/apache/flink/pull/6599
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 441dbf8ea58..39d98d82fc8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,7 +42,7 @@
 @Internal
 public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGroup> {
 
-	private final Map<OperatorID, OperatorMetricGroup> operators = new HashMap<>();
+	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
 	static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
 
@@ -134,25 +134,27 @@ public TaskIOMetricGroup getIOMetricGroup() {
 	//  operators and cleanup
 	// ------------------------------------------------------------------------
 
-	public OperatorMetricGroup addOperator(String name) {
-		return addOperator(OperatorID.fromJobVertexID(vertexId), name);
+	public OperatorMetricGroup getOrAddOperator(String name) {
+		return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), name);
 	}
 
-	public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+	public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 		if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
 			LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
 			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
 		}
 		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name);
+		// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators
+		final String key = operatorID + name;
 
 		synchronized (this) {
-			OperatorMetricGroup previous = operators.put(operatorID, operator);
+			OperatorMetricGroup previous = operators.put(key, operator);
 			if (previous == null) {
 				// no operator group so far
 				return operator;
 			} else {
 				// already had an operator group. restore that one.
-				operators.put(operatorID, previous);
+				operators.put(key, previous);
 				return previous;
 			}
 		}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 3869aa642f9..8c635eb994d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -145,7 +145,7 @@ protected UnregisteredTaskMetricGroup() {
 		}
 
 		@Override
-		public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+		public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 			return createUnregisteredOperatorMetricGroup();
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index e6978697b47..d5f2fd01678 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -254,7 +254,7 @@ public void invoke() throws Exception {
 
 		String headName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
 		this.metrics = getEnvironment().getMetricGroup()
-			.addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
+			.getOrAddOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
 		this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
 		if (config.getNumberOfChainedStubs() == 0) {
 			this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 0ea376e8c4d..2c263012d47 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -409,6 +409,6 @@ public DistributedRuntimeUDFContext createRuntimeContext() {
 
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
+				getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index cdfe1fa12f9..a5ccfab8696 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -402,6 +402,6 @@ public DistributedRuntimeUDFContext createRuntimeContext() {
 		sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().addOperator(sourceName));
+				getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 442a53c504a..7dd3a14e5ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -69,7 +69,7 @@ public void setup(TaskConfig config, String taskName, Collector<OT> outputCollec
 		this.config = config;
 		this.taskName = taskName;
 		this.userCodeClassLoader = userCodeClassLoader;
-		this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
+		this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
 		this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
 		this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
 		this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 58198e4f923..2f62f5593a7 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -93,7 +93,7 @@ public void testGenerateScopeCustom() throws Exception {
 			OperatorMetricGroup operatorGroup =
 				new TaskManagerMetricGroup(registry, "theHostName", tmID)
 					.addTaskForJob(jid, "myJobName", vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
-					.addOperator(operatorID, operatorName);
+					.getOrAddOperator(operatorID, operatorName);
 
 			assertArrayEquals(
 				new String[]{tmID, jid.toString(), vertexId.toString(), operatorName, operatorID.toString()},
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index d9e6158ccc8..079c7c63e38 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -176,7 +176,7 @@ public void testOperatorNameTruncation() throws Exception {
 		TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new JobVertexID(), new AbstractID(), "task", 0, 0);
 
 		String originalName = new String(new char[100]).replace("\0", "-");
-		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.getOrAddOperator(originalName);
 
 		String storedName = operatorMetricGroup.getScopeComponents()[0];
 		Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
new file mode 100644
index 00000000000..c393af50b98
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.TaskTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metrics related tests for batch task chains.
+ */
+public class ChainedOperatorsMetricTest extends TaskTestBase {
+
+	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+	private static final int NETWORK_BUFFER_SIZE = 1024;
+
+	private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get();
+
+	private final List<Record> outList = new ArrayList<>();
+
+	private static final String HEAD_OPERATOR_NAME = "headoperator";
+	private static final String CHAINED_OPERATOR_NAME = "chainedoperator";
+
+	@Test
+	public void testOperatorIOMetricReuse() throws Exception {
+		// environment
+		initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+		this.mockEnv = new MockEnvironmentBuilder()
+			.setTaskName(HEAD_OPERATOR_NAME)
+			.setMemorySize(MEMORY_MANAGER_SIZE)
+			.setInputSplitProvider(this.inputSplitProvider)
+			.setBufferSize(NETWORK_BUFFER_SIZE)
+			.setMetricGroup(new TaskMetricGroup(
+				NoOpMetricRegistry.INSTANCE,
+				UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(),
+				new JobVertexID(),
+				new AbstractID(),
+				"task",
+				0,
+				0))
+			.build();
+
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		final int numRecords = keyCnt * valCnt;
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
+		addOutput(this.outList);
+
+		// the chained operator
+		addChainedOperator();
+
+		// creates the head operator and assembles the chain
+		registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class);
+		final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
+
+		testTask.invoke();
+
+		Assert.assertEquals(numRecords * 2 * 2, this.outList.size());
+
+		final TaskMetricGroup taskMetricGroup = mockEnv.getMetricGroup();
+
+		// verify task-level metrics
+		{
+			final TaskIOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify head operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(HEAD_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify chained operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords * 2, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+	}
+
+	private void addChainedOperator() {
+		final TaskConfig chainedConfig = new TaskConfig(new Configuration());
+
+		// input
+		chainedConfig.addInputToGroup(0);
+		chainedConfig.setInputSerializer(serFact, 0);
+
+		// output
+		chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
+		chainedConfig.setOutputSerializer(serFact);
+
+		// driver
+		chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
+
+		// udf
+		chainedConfig.setStubWrapper(new UserCodeClassWrapper<>(DuplicatingFlatMapFunction.class));
+
+		getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, chainedConfig, CHAINED_OPERATOR_NAME);
+	}
+
+	/**
+	 * Simple {@link FlatMapFunction} that duplicates the input.
+	 */
+	public static class DuplicatingFlatMapFunction extends RichFlatMapFunction<Record, Record> {
+
+		private static final long serialVersionUID = -1152068682935346164L;
+
+		@Override
+		public void flatMap(final Record value, final Collector<Record> out) throws Exception {
+			out.collect(value);
+			out.collect(value);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 4bf94e93463..68858bc0cea 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -41,7 +41,6 @@
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -108,6 +107,8 @@
 
 	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
 
+	private final TaskMetricGroup taskMetricGroup;
+
 	public static MockEnvironmentBuilder builder() {
 		return new MockEnvironmentBuilder();
 	}
@@ -125,7 +126,8 @@ protected MockEnvironment(
 		int maxParallelism,
 		int parallelism,
 		int subtaskIndex,
-		ClassLoader userCodeClassLoader) {
+		ClassLoader userCodeClassLoader,
+		TaskMetricGroup taskMetricGroup) {
 
 		this.jobID = jobID;
 		this.jobVertexID = jobVertexID;
@@ -150,6 +152,8 @@ protected MockEnvironment(
 
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
+
+		this.taskMetricGroup = taskMetricGroup;
 	}
 
 
@@ -213,7 +217,7 @@ public TaskManagerRuntimeInfo getTaskManagerInfo() {
 
 	@Override
 	public TaskMetricGroup getMetricGroup() {
-		return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+		return taskMetricGroup;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfb10d4293a..dfcc5f312e0 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -23,6 +23,8 @@
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 
@@ -40,6 +42,7 @@
 	private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
 	private JobID jobID = new JobID();
 	private JobVertexID jobVertexID = new JobVertexID();
+	private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
 	public MockEnvironmentBuilder setTaskName(String taskName) {
 		this.taskName = taskName;
@@ -106,6 +109,11 @@ public MockEnvironmentBuilder setJobVertexID(JobVertexID jobVertexID) {
 		return this;
 	}
 
+	public MockEnvironmentBuilder setMetricGroup(TaskMetricGroup taskMetricGroup) {
+		this.taskMetricGroup = taskMetricGroup;
+		return this;
+	}
+
 	public MockEnvironment build() {
 		return new MockEnvironment(
 			jobID,
@@ -120,6 +128,7 @@ public MockEnvironment build() {
 			maxParallelism,
 			parallelism,
 			subtaskIndex,
-			userCodeClassLoader);
+			userCodeClassLoader,
+			taskMetricGroup);
 	}
 }
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 52ba3e4f1f5..f52168bd9b9 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -171,7 +171,7 @@ public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<S
 		this.container = containingTask;
 		this.config = config;
 		try {
-			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().addOperator(config.getOperatorID(), config.getOperatorName());
+			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
 			this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
 			if (config.isChainStart()) {
 				operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 96eaa78ed1a..f6d5021b884 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -626,7 +626,7 @@ public void testOperatorMetricReuse() throws Exception {
 
 		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
 			}
 		};
@@ -682,13 +682,13 @@ public void testWatermarkMetrics() throws Exception {
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID id, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
 					return headOperatorMetricGroup;
 				} else if (id.equals(chainedOperatorId)) {
 					return chainedOperatorMetricGroup;
 				} else {
-					return super.addOperator(id, name);
+					return super.getOrAddOperator(id, name);
 				}
 			}
 		};
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 5d151573582..c48b6474988 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -402,7 +402,7 @@ public void testOperatorMetricReuse() throws Exception {
 
 		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
 			}
 		};
@@ -470,13 +470,13 @@ public void testWatermarkMetrics() throws Exception {
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID id, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
 					return headOperatorMetricGroup;
 				} else if (id.equals(chainedOperatorId)) {
 					return chainedOperatorMetricGroup;
 				} else {
-					return super.addOperator(id, name);
+					return super.getOrAddOperator(id, name);
 				}
 			}
 		};


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services