You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/09/03 13:03:35 UTC

[flink] branch master updated (d216dc9 -> 4573d48)

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


    from d216dc9  [FLINK-10186][streaming] Use ThreadLocalRandom in BufferSpiller constructor
     new 93ac958  [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
     new 4573d48  [hotfix][metrics] Rename TaskMetricGroup#addOperator

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/metrics/groups/TaskMetricGroup.java    |  14 +-
 .../metrics/groups/UnregisteredMetricGroups.java   |   2 +-
 .../apache/flink/runtime/operators/BatchTask.java  |   2 +-
 .../flink/runtime/operators/DataSinkTask.java      |   2 +-
 .../flink/runtime/operators/DataSourceTask.java    |   2 +-
 .../runtime/operators/chaining/ChainedDriver.java  |   2 +-
 .../runtime/metrics/groups/OperatorGroupTest.java  |   2 +-
 .../metrics/groups/TaskMetricGroupTest.java        |   2 +-
 .../chaining/ChainedOperatorsMetricTest.java       | 175 +++++++++++++++++++++
 .../operators/testutils/MockEnvironment.java       |  10 +-
 .../testutils/MockEnvironmentBuilder.java          |  11 +-
 .../api/operators/AbstractStreamOperator.java      |   2 +-
 .../runtime/tasks/OneInputStreamTaskTest.java      |   6 +-
 .../runtime/tasks/TwoInputStreamTaskTest.java      |   6 +-
 14 files changed, 214 insertions(+), 24 deletions(-)
 create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java


[flink] 02/02: [hotfix][metrics] Rename TaskMetricGroup#addOperator

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4573d48008d3b19fa7e90ada7d2783ed02ba71d0
Author: zentol <ch...@apache.org>
AuthorDate: Mon Sep 3 10:44:07 2018 +0200

    [hotfix][metrics] Rename TaskMetricGroup#addOperator
---
 .../org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java    | 6 +++---
 .../flink/runtime/metrics/groups/UnregisteredMetricGroups.java      | 2 +-
 .../src/main/java/org/apache/flink/runtime/operators/BatchTask.java | 2 +-
 .../main/java/org/apache/flink/runtime/operators/DataSinkTask.java  | 2 +-
 .../java/org/apache/flink/runtime/operators/DataSourceTask.java     | 2 +-
 .../org/apache/flink/runtime/operators/chaining/ChainedDriver.java  | 2 +-
 .../org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java  | 2 +-
 .../apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java    | 2 +-
 .../runtime/operators/chaining/ChainedOperatorsMetricTest.java      | 4 ++--
 .../flink/streaming/api/operators/AbstractStreamOperator.java       | 2 +-
 .../flink/streaming/runtime/tasks/OneInputStreamTaskTest.java       | 6 +++---
 .../flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java       | 6 +++---
 12 files changed, 19 insertions(+), 19 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 124fbf2..39d98d8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -134,11 +134,11 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	//  operators and cleanup
 	// ------------------------------------------------------------------------
 
-	public OperatorMetricGroup addOperator(String name) {
-		return addOperator(OperatorID.fromJobVertexID(vertexId), name);
+	public OperatorMetricGroup getOrAddOperator(String name) {
+		return getOrAddOperator(OperatorID.fromJobVertexID(vertexId), name);
 	}
 
-	public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+	public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 		if (name != null && name.length() > METRICS_OPERATOR_NAME_MAX_LENGTH) {
 			LOG.warn("The operator name {} exceeded the {} characters length limit and was truncated.", name, METRICS_OPERATOR_NAME_MAX_LENGTH);
 			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
index 3869aa6..8c635eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -145,7 +145,7 @@ public class UnregisteredMetricGroups {
 		}
 
 		@Override
-		public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+		public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 			return createUnregisteredOperatorMetricGroup();
 		}
 	}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
index e697869..d5f2fd0 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/BatchTask.java
@@ -254,7 +254,7 @@ public class BatchTask<S extends Function, OT> extends AbstractInvokable impleme
 
 		String headName =  getEnvironment().getTaskInfo().getTaskName().split("->")[0].trim();
 		this.metrics = getEnvironment().getMetricGroup()
-			.addOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
+			.getOrAddOperator(headName.startsWith("CHAIN") ? headName.substring(6) : headName);
 		this.metrics.getIOMetricGroup().reuseInputMetricsForTask();
 		if (config.getNumberOfChainedStubs() == 0) {
 			this.metrics.getIOMetricGroup().reuseOutputMetricsForTask();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index 0ea376e..2c26301 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -409,6 +409,6 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().addOperator(getEnvironment().getTaskInfo().getTaskName()));
+				getEnvironment().getMetricGroup().getOrAddOperator(getEnvironment().getTaskInfo().getTaskName()));
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index cdfe1fa..a5ccfab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -402,6 +402,6 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		sourceName = sourceName.startsWith("CHAIN") ? sourceName.substring(6) : sourceName;
 		return new DistributedRuntimeUDFContext(env.getTaskInfo(), getUserCodeClassLoader(),
 				getExecutionConfig(), env.getDistributedCacheEntries(), env.getAccumulatorRegistry().getUserMap(), 
-				getEnvironment().getMetricGroup().addOperator(sourceName));
+				getEnvironment().getMetricGroup().getOrAddOperator(sourceName));
 	}
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
index 442a53c..7dd3a14 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/chaining/ChainedDriver.java
@@ -69,7 +69,7 @@ public abstract class ChainedDriver<IT, OT> implements Collector<IT> {
 		this.config = config;
 		this.taskName = taskName;
 		this.userCodeClassLoader = userCodeClassLoader;
-		this.metrics = parent.getEnvironment().getMetricGroup().addOperator(taskName);
+		this.metrics = parent.getEnvironment().getMetricGroup().getOrAddOperator(taskName);
 		this.numRecordsIn = this.metrics.getIOMetricGroup().getNumRecordsInCounter();
 		this.numRecordsOut = this.metrics.getIOMetricGroup().getNumRecordsOutCounter();
 		this.outputCollector = new CountingCollector<>(outputCollector, numRecordsOut);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 58198e4..2f62f55 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -93,7 +93,7 @@ public class OperatorGroupTest extends TestLogger {
 			OperatorMetricGroup operatorGroup =
 				new TaskManagerMetricGroup(registry, "theHostName", tmID)
 					.addTaskForJob(jid, "myJobName", vertexId, new ExecutionAttemptID(), "aTaskname", 13, 2)
-					.addOperator(operatorID, operatorName);
+					.getOrAddOperator(operatorID, operatorName);
 
 			assertArrayEquals(
 				new String[]{tmID, jid.toString(), vertexId.toString(), operatorName, operatorID.toString()},
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
index d9e6158..079c7c6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroupTest.java
@@ -176,7 +176,7 @@ public class TaskMetricGroupTest extends TestLogger {
 		TaskMetricGroup taskMetricGroup = new TaskMetricGroup(registry, job, new JobVertexID(), new AbstractID(), "task", 0, 0);
 
 		String originalName = new String(new char[100]).replace("\0", "-");
-		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.addOperator(originalName);
+		OperatorMetricGroup operatorMetricGroup = taskMetricGroup.getOrAddOperator(originalName);
 
 		String storedName = operatorMetricGroup.getScopeComponents()[0];
 		Assert.assertEquals(TaskMetricGroup.METRICS_OPERATOR_NAME_MAX_LENGTH, storedName.length());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
index 29ff6e8..c393af5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -117,7 +117,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase {
 		// verify head operator metrics
 		{
 			// this only returns the existing group and doesn't create a new one
-			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(HEAD_OPERATOR_NAME);
 			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
 			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
 			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
@@ -129,7 +129,7 @@ public class ChainedOperatorsMetricTest extends TaskTestBase {
 		// verify chained operator metrics
 		{
 			// this only returns the existing group and doesn't create a new one
-			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.getOrAddOperator(CHAINED_OPERATOR_NAME);
 			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
 			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
 			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 52ba3e4..f52168b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -171,7 +171,7 @@ public abstract class AbstractStreamOperator<OUT>
 		this.container = containingTask;
 		this.config = config;
 		try {
-			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().addOperator(config.getOperatorID(), config.getOperatorName());
+			OperatorMetricGroup operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName());
 			this.output = new CountingOutput(output, operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter());
 			if (config.isChainStart()) {
 				operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
index 96eaa78..f6d5021 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -626,7 +626,7 @@ public class OneInputStreamTaskTest extends TestLogger {
 
 		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
 			}
 		};
@@ -682,13 +682,13 @@ public class OneInputStreamTaskTest extends TestLogger {
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID id, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
 					return headOperatorMetricGroup;
 				} else if (id.equals(chainedOperatorId)) {
 					return chainedOperatorMetricGroup;
 				} else {
-					return super.addOperator(id, name);
+					return super.getOrAddOperator(id, name);
 				}
 			}
 		};
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
index 5d15157..c48b647 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -402,7 +402,7 @@ public class TwoInputStreamTaskTest {
 
 		final TaskMetricGroup taskMetricGroup = new UnregisteredMetricGroups.UnregisteredTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID operatorID, String name) {
 				return new OperatorMetricGroup(NoOpMetricRegistry.INSTANCE, this, operatorID, name);
 			}
 		};
@@ -470,13 +470,13 @@ public class TwoInputStreamTaskTest {
 		InterceptingOperatorMetricGroup chainedOperatorMetricGroup = new InterceptingOperatorMetricGroup();
 		InterceptingTaskMetricGroup taskMetricGroup = new InterceptingTaskMetricGroup() {
 			@Override
-			public OperatorMetricGroup addOperator(OperatorID id, String name) {
+			public OperatorMetricGroup getOrAddOperator(OperatorID id, String name) {
 				if (id.equals(headOperatorId)) {
 					return headOperatorMetricGroup;
 				} else if (id.equals(chainedOperatorId)) {
 					return chainedOperatorMetricGroup;
 				} else {
-					return super.addOperator(id, name);
+					return super.getOrAddOperator(id, name);
 				}
 			}
 		};


[flink] 01/02: [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 93ac95866e4473982a89e563d58ef0e374b3c0ba
Author: zentol <ch...@apache.org>
AuthorDate: Tue Aug 21 18:37:07 2018 +0200

    [FLINK-10150][metrics] Fix OperatorMetricGroup creation for Batch
---
 .../runtime/metrics/groups/TaskMetricGroup.java    |   8 +-
 .../chaining/ChainedOperatorsMetricTest.java       | 175 +++++++++++++++++++++
 .../operators/testutils/MockEnvironment.java       |  10 +-
 .../testutils/MockEnvironmentBuilder.java          |  11 +-
 4 files changed, 197 insertions(+), 7 deletions(-)

diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 441dbf8..124fbf2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -42,7 +42,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGroup> {
 
-	private final Map<OperatorID, OperatorMetricGroup> operators = new HashMap<>();
+	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
 	static final int METRICS_OPERATOR_NAME_MAX_LENGTH = 80;
 
@@ -144,15 +144,17 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 			name = name.substring(0, METRICS_OPERATOR_NAME_MAX_LENGTH);
 		}
 		OperatorMetricGroup operator = new OperatorMetricGroup(this.registry, this, operatorID, name);
+		// unique OperatorIDs only exist in streaming, so we have to rely on the name for batch operators
+		final String key = operatorID + name;
 
 		synchronized (this) {
-			OperatorMetricGroup previous = operators.put(operatorID, operator);
+			OperatorMetricGroup previous = operators.put(key, operator);
 			if (previous == null) {
 				// no operator group so far
 				return operator;
 			} else {
 				// already had an operator group. restore that one.
-				operators.put(operatorID, previous);
+				operators.put(key, previous);
 				return previous;
 			}
 		}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
new file mode 100644
index 0000000..29ff6e8
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/chaining/ChainedOperatorsMetricTest.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.chaining;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.FlatMapDriver;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.TaskTestBase;
+import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
+import org.apache.flink.types.Record;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Metrics related tests for batch task chains.
+ */
+public class ChainedOperatorsMetricTest extends TaskTestBase {
+
+	private static final int MEMORY_MANAGER_SIZE = 1024 * 1024 * 3;
+
+	private static final int NETWORK_BUFFER_SIZE = 1024;
+
+	private static final TypeSerializerFactory<Record> serFact = RecordSerializerFactory.get();
+
+	private final List<Record> outList = new ArrayList<>();
+
+	private static final String HEAD_OPERATOR_NAME = "headoperator";
+	private static final String CHAINED_OPERATOR_NAME = "chainedoperator";
+
+	@Test
+	public void testOperatorIOMetricReuse() throws Exception {
+		// environment
+		initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
+		this.mockEnv = new MockEnvironmentBuilder()
+			.setTaskName(HEAD_OPERATOR_NAME)
+			.setMemorySize(MEMORY_MANAGER_SIZE)
+			.setInputSplitProvider(this.inputSplitProvider)
+			.setBufferSize(NETWORK_BUFFER_SIZE)
+			.setMetricGroup(new TaskMetricGroup(
+				NoOpMetricRegistry.INSTANCE,
+				UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup(),
+				new JobVertexID(),
+				new AbstractID(),
+				"task",
+				0,
+				0))
+			.build();
+
+		final int keyCnt = 100;
+		final int valCnt = 20;
+		final int numRecords = keyCnt * valCnt;
+		addInput(new UniformRecordGenerator(keyCnt, valCnt, false), 0);
+		addOutput(this.outList);
+
+		// the chained operator
+		addChainedOperator();
+
+		// creates the head operator and assembles the chain
+		registerTask(FlatMapDriver.class, DuplicatingFlatMapFunction.class);
+		final BatchTask<FlatMapFunction<Record, Record>, Record> testTask = new BatchTask<>(this.mockEnv);
+
+		testTask.invoke();
+
+		Assert.assertEquals(numRecords * 2 * 2, this.outList.size());
+
+		final TaskMetricGroup taskMetricGroup = mockEnv.getMetricGroup();
+
+		// verify task-level metrics
+		{
+			final TaskIOMetricGroup ioMetricGroup = taskMetricGroup.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify head operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(HEAD_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2, numRecordsOutCounter.getCount());
+		}
+
+		// verify chained operator metrics
+		{
+			// this only returns the existing group and doesn't create a new one
+			final OperatorMetricGroup operatorMetricGroup1 = taskMetricGroup.addOperator(CHAINED_OPERATOR_NAME);
+			final OperatorIOMetricGroup ioMetricGroup = operatorMetricGroup1.getIOMetricGroup();
+			final Counter numRecordsInCounter = ioMetricGroup.getNumRecordsInCounter();
+			final Counter numRecordsOutCounter = ioMetricGroup.getNumRecordsOutCounter();
+
+			Assert.assertEquals(numRecords * 2, numRecordsInCounter.getCount());
+			Assert.assertEquals(numRecords * 2 * 2, numRecordsOutCounter.getCount());
+		}
+	}
+
+	private void addChainedOperator() {
+		final TaskConfig chainedConfig = new TaskConfig(new Configuration());
+
+		// input
+		chainedConfig.addInputToGroup(0);
+		chainedConfig.setInputSerializer(serFact, 0);
+
+		// output
+		chainedConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
+		chainedConfig.setOutputSerializer(serFact);
+
+		// driver
+		chainedConfig.setDriverStrategy(DriverStrategy.FLAT_MAP);
+
+		// udf
+		chainedConfig.setStubWrapper(new UserCodeClassWrapper<>(DuplicatingFlatMapFunction.class));
+
+		getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, chainedConfig, CHAINED_OPERATOR_NAME);
+	}
+
+	/**
+	 * Simple {@link FlatMapFunction} that duplicates the input.
+	 */
+	public static class DuplicatingFlatMapFunction extends RichFlatMapFunction<Record, Record> {
+
+		private static final long serialVersionUID = -1152068682935346164L;
+
+		@Override
+		public void flatMap(final Record value, final Collector<Record> out) throws Exception {
+			out.collect(value);
+			out.collect(value);
+		}
+	}
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 4bf94e9..68858bc 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -41,7 +41,6 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.TaskStateManager;
@@ -108,6 +107,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	private Optional<Throwable> actualExternalFailureCause = Optional.empty();
 
+	private final TaskMetricGroup taskMetricGroup;
+
 	public static MockEnvironmentBuilder builder() {
 		return new MockEnvironmentBuilder();
 	}
@@ -125,7 +126,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 		int maxParallelism,
 		int parallelism,
 		int subtaskIndex,
-		ClassLoader userCodeClassLoader) {
+		ClassLoader userCodeClassLoader,
+		TaskMetricGroup taskMetricGroup) {
 
 		this.jobID = jobID;
 		this.jobVertexID = jobVertexID;
@@ -150,6 +152,8 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 		this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
 		this.taskStateManager = Preconditions.checkNotNull(taskStateManager);
+
+		this.taskMetricGroup = taskMetricGroup;
 	}
 
 
@@ -213,7 +217,7 @@ public class MockEnvironment implements Environment, AutoCloseable {
 
 	@Override
 	public TaskMetricGroup getMetricGroup() {
-		return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
+		return taskMetricGroup;
 	}
 
 	@Override
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
index dfb10d4..dfcc5f3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironmentBuilder.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.TaskStateManager;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 
@@ -40,6 +42,7 @@ public class MockEnvironmentBuilder {
 	private ClassLoader userCodeClassLoader = Thread.currentThread().getContextClassLoader();
 	private JobID jobID = new JobID();
 	private JobVertexID jobVertexID = new JobVertexID();
+	private TaskMetricGroup taskMetricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 
 	public MockEnvironmentBuilder setTaskName(String taskName) {
 		this.taskName = taskName;
@@ -106,6 +109,11 @@ public class MockEnvironmentBuilder {
 		return this;
 	}
 
+	public MockEnvironmentBuilder setMetricGroup(TaskMetricGroup taskMetricGroup) {
+		this.taskMetricGroup = taskMetricGroup;
+		return this;
+	}
+
 	public MockEnvironment build() {
 		return new MockEnvironment(
 			jobID,
@@ -120,6 +128,7 @@ public class MockEnvironmentBuilder {
 			maxParallelism,
 			parallelism,
 			subtaskIndex,
-			userCodeClassLoader);
+			userCodeClassLoader,
+			taskMetricGroup);
 	}
 }