You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/25 10:03:42 UTC

[2/2] flink git commit: [FLINK-4773] [metrics] [refactor] Rename IOMetricGroup to TaskIOMetricGroup

[FLINK-4773] [metrics] [refactor] Rename IOMetricGroup to TaskIOMetricGroup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77258a00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77258a00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77258a00

Branch: refs/heads/master
Commit: 77258a007ae4dde525d0274bb4d63e76d834bfe0
Parents: c4f9f0d
Author: zentol <ch...@apache.org>
Authored: Fri Oct 7 13:02:10 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Oct 25 12:03:26 2016 +0200

----------------------------------------------------------------------
 .../api/serialization/RecordSerializer.java     |  4 +-
 .../serialization/SpanningRecordSerializer.java |  6 +--
 .../io/network/api/writer/RecordWriter.java     |  4 +-
 .../partition/consumer/LocalInputChannel.java   |  6 +--
 .../partition/consumer/RemoteInputChannel.java  |  6 +--
 .../partition/consumer/SingleInputGate.java     |  6 +--
 .../partition/consumer/UnknownInputChannel.java |  6 +--
 .../runtime/metrics/groups/IOMetricGroup.java   | 52 --------------------
 .../metrics/groups/TaskIOMetricGroup.java       | 52 ++++++++++++++++++++
 .../runtime/metrics/groups/TaskMetricGroup.java | 10 ++--
 .../consumer/LocalInputChannelTest.java         |  6 +--
 .../consumer/RemoteInputChannelTest.java        |  6 +--
 .../partition/consumer/SingleInputGateTest.java | 16 +++---
 .../partition/consumer/TestSingleInputGate.java |  2 +-
 .../partition/consumer/UnionInputGateTest.java  |  4 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |  6 +--
 .../runtime/io/StreamInputProcessor.java        |  4 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  4 +-
 18 files changed, 100 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index a560bb6..c76dd00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
 import java.io.IOException;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 
@@ -76,5 +76,5 @@ public interface RecordSerializer<T extends IOReadableWritable> {
 	 *
 	 * @param metrics metric group
 	 */
-	void instantiateMetrics(IOMetricGroup metrics);
+	void instantiateMetrics(TaskIOMetricGroup metrics);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 9d4f765..7c4d937 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -25,7 +25,7 @@ import java.nio.ByteOrder;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -197,7 +197,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
 	}
 
 	@Override
-	public void instantiateMetrics(IOMetricGroup metrics) {
-		numBytesOut = metrics.getBytesOutCounter();
+	public void instantiateMetrics(TaskIOMetricGroup metrics) {
+		numBytesOut = metrics.getNumBytesOutCounter();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 422aa65..96eea23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.io.network.api.writer;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -213,7 +213,7 @@ public class RecordWriter<T extends IOReadableWritable> {
 	 * Sets the metric group for this RecordWriter.
 	 * @param metrics
      */
-	public void setMetricGroup(IOMetricGroup metrics) {
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
 		for(RecordSerializer<?> serializer : serializers) {
 			serializer.instantiateMetrics(metrics);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index a8aae7e..55ff539 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -68,7 +68,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			ResultPartitionID partitionId,
 			ResultPartitionManager partitionManager,
 			TaskEventDispatcher taskEventDispatcher,
-			IOMetricGroup metrics) {
+			TaskIOMetricGroup metrics) {
 
 		this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
 				0, 0, metrics);
@@ -82,7 +82,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 			TaskEventDispatcher taskEventDispatcher,
 			int initialBackoff,
 			int maxBackoff,
-			IOMetricGroup metrics) {
+			TaskIOMetricGroup metrics) {
 
 		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index a12d2a8..13a71a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -81,7 +81,7 @@ public class RemoteInputChannel extends InputChannel {
 			ResultPartitionID partitionId,
 			ConnectionID connectionId,
 			ConnectionManager connectionManager,
-			IOMetricGroup metrics) {
+			TaskIOMetricGroup metrics) {
 
 		this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
 				0, 0, metrics);
@@ -95,7 +95,7 @@ public class RemoteInputChannel extends InputChannel {
 			ConnectionManager connectionManager,
 			int initialBackoff,
 			int maxBackoff,
-			IOMetricGroup metrics) {
+			TaskIOMetricGroup metrics) {
 
 		super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInRemoteCounter());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0db30ee..f4e4325 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 
 import com.google.common.collect.Maps;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -175,7 +175,7 @@ public class SingleInputGate implements InputGate {
 			int consumedSubpartitionIndex,
 			int numberOfInputChannels,
 			TaskActions taskActions,
-			IOMetricGroup metrics) {
+			TaskIOMetricGroup metrics) {
 
 		this.owningTaskName = checkNotNull(owningTaskName);
 		this.jobId = checkNotNull(jobId);
@@ -506,7 +506,7 @@ public class SingleInputGate implements InputGate {
 			InputGateDeploymentDescriptor igdd,
 			NetworkEnvironment networkEnvironment,
 			TaskActions taskActions,
-			IOMetricGroup metrics) {
+			TaskIOMetricGroup metrics) {
 
 		final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 27ecc70..08b5044 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition.consumer;
 
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.event.TaskEvent;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -49,7 +49,7 @@ public class UnknownInputChannel extends InputChannel {
 
 	private final int maxBackoff;
 
-	private final IOMetricGroup metrics;
+	private final TaskIOMetricGroup metrics;
 
 	public UnknownInputChannel(
 			SingleInputGate gate,
@@ -60,7 +60,7 @@ public class UnknownInputChannel extends InputChannel {
 			ConnectionManager connectionManager,
 			int initialBackoff,
 			int maxBackoff,
-			IOMetricGroup metrics) {
+			TaskIOMetricGroup metrics) {
 
 		super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
deleted file mode 100644
index 8fa6111..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics.groups;
-
-import org.apache.flink.metrics.Counter;
-
-/**
- * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
- * forwarded to the parent task metric group.
- */
-public class IOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
-
-	private final Counter numBytesOut;
-	private final Counter numBytesInLocal;
-	private final Counter numBytesInRemote;
-
-	public IOMetricGroup(TaskMetricGroup parent) {
-		super(parent);
-
-		this.numBytesOut = counter("numBytesOut");
-		this.numBytesInLocal = counter("numBytesInLocal");
-		this.numBytesInRemote = counter("numBytesInRemote");
-	}
-
-	public Counter getBytesOutCounter() {
-		return numBytesOut;
-	}
-
-	public Counter getNumBytesInLocalCounter() {
-		return numBytesInLocal;
-	}
-
-	public Counter getNumBytesInRemoteCounter() {
-		return numBytesInRemote;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
new file mode 100644
index 0000000..a726c26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+
+/**
+ * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
+ * forwarded to the parent task metric group.
+ */
+public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
+
+	private final Counter numBytesOut;
+	private final Counter numBytesInLocal;
+	private final Counter numBytesInRemote;
+
+	public TaskIOMetricGroup(TaskMetricGroup parent) {
+		super(parent);
+
+		this.numBytesOut = counter("numBytesOut");
+		this.numBytesInLocal = counter("numBytesInLocal");
+		this.numBytesInRemote = counter("numBytesInRemote");
+	}
+
+	public Counter getNumBytesOutCounter() {
+		return numBytesOut;
+	}
+
+	public Counter getNumBytesInLocalCounter() {
+		return numBytesInLocal;
+	}
+
+	public Counter getNumBytesInRemoteCounter() {
+		return numBytesInRemote;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 5082fd8..75b8bd8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -41,7 +41,7 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 
 	private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
 
-	private final IOMetricGroup ioMetrics;
+	private final TaskIOMetricGroup ioMetrics;
 	
 	/** The execution Id uniquely identifying the executed task represented by this metrics group */
 	private final AbstractID executionId;
@@ -75,7 +75,7 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 		this.subtaskIndex = subtaskIndex;
 		this.attemptNumber = attemptNumber;
 
-		this.ioMetrics = new IOMetricGroup(this);
+		this.ioMetrics = new TaskIOMetricGroup(this);
 	}
 
 	// ------------------------------------------------------------------------
@@ -109,11 +109,11 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
 	}
 
 	/**
-	 * Returns the IOMetricGroup for this task.
+	 * Returns the TaskIOMetricGroup for this task.
 	 *
-	 * @return IOMetricGroup for this task.
+	 * @return TaskIOMetricGroup for this task.
 	 */
-	public IOMetricGroup getIOMetricGroup() {
+	public TaskIOMetricGroup getIOMetricGroup() {
 		return ioMetrics;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 75f2bcc..19bb67e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -277,7 +277,7 @@ public class LocalInputChannelTest {
 			mock(TaskEventDispatcher.class),
 			initialAndMaxRequestBackoff._1(),
 			initialAndMaxRequestBackoff._2(),
-			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 	}
 
 	/**
@@ -353,7 +353,7 @@ public class LocalInputChannelTest {
 				subpartitionIndex,
 				numberOfInputChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);
@@ -368,7 +368,7 @@ public class LocalInputChannelTest {
 								consumedPartitionIds[i],
 								partitionManager,
 								taskEventDispatcher,
-								new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()));
+								new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()));
 			}
 
 			this.numberOfInputChannels = numberOfInputChannels;

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9a79ff8..2c2f966 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -249,7 +249,7 @@ public class RemoteInputChannelTest {
 				partitionId,
 				mock(ConnectionID.class),
 				connectionManager,
-				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		ch.onFailedPartitionRequest();
 
@@ -269,7 +269,7 @@ public class RemoteInputChannelTest {
 				new ResultPartitionID(),
 				mock(ConnectionID.class),
 				connManager,
-				new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
 
@@ -306,6 +306,6 @@ public class RemoteInputChannelTest {
 			connectionManager,
 			initialAndMaxRequestBackoff._1(),
 			initialAndMaxRequestBackoff._2(),
-			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 9e4ab86..8f9ea9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -66,7 +66,7 @@ public class SingleInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final SingleInputGate inputGate = new SingleInputGate(
-				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+				"Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		final TestInputChannel[] inputChannels = new TestInputChannel[]{
 				new TestInputChannel(inputGate, 0),
@@ -113,7 +113,7 @@ public class SingleInputGateTest {
 		// Setup reader with one local and one unknown input channel
 		final IntermediateDataSetID resultId = new IntermediateDataSetID();
 
-		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -122,12 +122,12 @@ public class SingleInputGateTest {
 		// Local
 		ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		// Unknown
 		ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		// Set channels
 		inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
@@ -169,7 +169,7 @@ public class SingleInputGateTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -182,7 +182,7 @@ public class SingleInputGateTest {
 			new LocalConnectionManager(),
 			0,
 			0,
-			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 
@@ -212,7 +212,7 @@ public class SingleInputGateTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		InputChannel unknown = new UnknownInputChannel(
 			inputGate,
@@ -223,7 +223,7 @@ public class SingleInputGateTest {
 			new LocalConnectionManager(),
 			0,
 			0,
-			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 867c273..3972917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -67,7 +67,7 @@ public class TestSingleInputGate {
 			0,
 			numberOfInputChannels,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 466879e..cba3199 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -44,8 +44,8 @@ public class UnionInputGateTest {
 	public void testBasicGetNextLogic() throws Exception {
 		// Setup
 		final String testTaskName = "Test Task";
-		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
-		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+		final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index b4a7400..ae87085 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -23,7 +23,7 @@ import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -67,8 +67,8 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
 		}
 	}
 	
-	public static class DummyIOMetricGroup extends IOMetricGroup {
-		public DummyIOMetricGroup() {
+	public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup {
+		public DummyTaskIOMetricGroup() {
 			super(new UnregisteredTaskMetricsGroup());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 92b1556..aee0c70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -215,7 +215,7 @@ public class StreamInputProcessor<IN> {
 	 * 
 	 * @param metrics metric group
 	 */
-	public void setMetricGroup(IOMetricGroup metrics) {
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
 		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
 			@Override
 			public Long getValue() {

http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 660f07e..075d9e0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -281,7 +281,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 	 *
 	 * @param metrics metric group
 	 */
-	public void setMetricGroup(IOMetricGroup metrics) {
+	public void setMetricGroup(TaskIOMetricGroup metrics) {
 		metrics.gauge("currentLowWatermark", new Gauge<Long>() {
 			@Override
 			public Long getValue() {