You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2016/10/25 10:03:42 UTC
[2/2] flink git commit: [FLINK-4773] [metrics] [refactor] Rename
IOMetricGroup to TaskIOMetricGroup
[FLINK-4773] [metrics] [refactor] Rename IOMetricGroup to TaskIOMetricGroup
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/77258a00
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/77258a00
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/77258a00
Branch: refs/heads/master
Commit: 77258a007ae4dde525d0274bb4d63e76d834bfe0
Parents: c4f9f0d
Author: zentol <ch...@apache.org>
Authored: Fri Oct 7 13:02:10 2016 +0200
Committer: zentol <ch...@apache.org>
Committed: Tue Oct 25 12:03:26 2016 +0200
----------------------------------------------------------------------
.../api/serialization/RecordSerializer.java | 4 +-
.../serialization/SpanningRecordSerializer.java | 6 +--
.../io/network/api/writer/RecordWriter.java | 4 +-
.../partition/consumer/LocalInputChannel.java | 6 +--
.../partition/consumer/RemoteInputChannel.java | 6 +--
.../partition/consumer/SingleInputGate.java | 6 +--
.../partition/consumer/UnknownInputChannel.java | 6 +--
.../runtime/metrics/groups/IOMetricGroup.java | 52 --------------------
.../metrics/groups/TaskIOMetricGroup.java | 52 ++++++++++++++++++++
.../runtime/metrics/groups/TaskMetricGroup.java | 10 ++--
.../consumer/LocalInputChannelTest.java | 6 +--
.../consumer/RemoteInputChannelTest.java | 6 +--
.../partition/consumer/SingleInputGateTest.java | 16 +++---
.../partition/consumer/TestSingleInputGate.java | 2 +-
.../partition/consumer/UnionInputGateTest.java | 4 +-
.../testutils/UnregisteredTaskMetricsGroup.java | 6 +--
.../runtime/io/StreamInputProcessor.java | 4 +-
.../runtime/io/StreamTwoInputProcessor.java | 4 +-
18 files changed, 100 insertions(+), 100 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
index a560bb6..c76dd00 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/RecordSerializer.java
@@ -22,7 +22,7 @@ package org.apache.flink.runtime.io.network.api.serialization;
import java.io.IOException;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
@@ -76,5 +76,5 @@ public interface RecordSerializer<T extends IOReadableWritable> {
*
* @param metrics metric group
*/
- void instantiateMetrics(IOMetricGroup metrics);
+ void instantiateMetrics(TaskIOMetricGroup metrics);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 9d4f765..7c4d937 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -25,7 +25,7 @@ import java.nio.ByteOrder;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.metrics.Counter;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.util.DataOutputSerializer;
@@ -197,7 +197,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R
}
@Override
- public void instantiateMetrics(IOMetricGroup metrics) {
- numBytesOut = metrics.getBytesOutCounter();
+ public void instantiateMetrics(TaskIOMetricGroup metrics) {
+ numBytesOut = metrics.getNumBytesOutCounter();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 422aa65..96eea23 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.io.network.api.writer;
import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
@@ -213,7 +213,7 @@ public class RecordWriter<T extends IOReadableWritable> {
* Sets the metric group for this RecordWriter.
* @param metrics
*/
- public void setMetricGroup(IOMetricGroup metrics) {
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
for(RecordSerializer<?> serializer : serializers) {
serializer.instantiateMetrics(metrics);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index a8aae7e..55ff539 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -68,7 +68,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
ResultPartitionID partitionId,
ResultPartitionManager partitionManager,
TaskEventDispatcher taskEventDispatcher,
- IOMetricGroup metrics) {
+ TaskIOMetricGroup metrics) {
this(inputGate, channelIndex, partitionId, partitionManager, taskEventDispatcher,
0, 0, metrics);
@@ -82,7 +82,7 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
TaskEventDispatcher taskEventDispatcher,
int initialBackoff,
int maxBackoff,
- IOMetricGroup metrics) {
+ TaskIOMetricGroup metrics) {
super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInLocalCounter());
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index a12d2a8..13a71a9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -81,7 +81,7 @@ public class RemoteInputChannel extends InputChannel {
ResultPartitionID partitionId,
ConnectionID connectionId,
ConnectionManager connectionManager,
- IOMetricGroup metrics) {
+ TaskIOMetricGroup metrics) {
this(inputGate, channelIndex, partitionId, connectionId, connectionManager,
0, 0, metrics);
@@ -95,7 +95,7 @@ public class RemoteInputChannel extends InputChannel {
ConnectionManager connectionManager,
int initialBackoff,
int maxBackoff,
- IOMetricGroup metrics) {
+ TaskIOMetricGroup metrics) {
super(inputGate, channelIndex, partitionId, initialBackoff, maxBackoff, metrics.getNumBytesInRemoteCounter());
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 0db30ee..f4e4325 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import com.google.common.collect.Maps;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.taskmanager.TaskActions;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
@@ -175,7 +175,7 @@ public class SingleInputGate implements InputGate {
int consumedSubpartitionIndex,
int numberOfInputChannels,
TaskActions taskActions,
- IOMetricGroup metrics) {
+ TaskIOMetricGroup metrics) {
this.owningTaskName = checkNotNull(owningTaskName);
this.jobId = checkNotNull(jobId);
@@ -506,7 +506,7 @@ public class SingleInputGate implements InputGate {
InputGateDeploymentDescriptor igdd,
NetworkEnvironment networkEnvironment,
TaskActions taskActions,
- IOMetricGroup metrics) {
+ TaskIOMetricGroup metrics) {
final IntermediateDataSetID consumedResultId = checkNotNull(igdd.getConsumedResultId());
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
index 27ecc70..08b5044 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/UnknownInputChannel.java
@@ -18,7 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.ConnectionManager;
@@ -49,7 +49,7 @@ public class UnknownInputChannel extends InputChannel {
private final int maxBackoff;
- private final IOMetricGroup metrics;
+ private final TaskIOMetricGroup metrics;
public UnknownInputChannel(
SingleInputGate gate,
@@ -60,7 +60,7 @@ public class UnknownInputChannel extends InputChannel {
ConnectionManager connectionManager,
int initialBackoff,
int maxBackoff,
- IOMetricGroup metrics) {
+ TaskIOMetricGroup metrics) {
super(gate, channelIndex, partitionId, initialBackoff, maxBackoff, null);
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
deleted file mode 100644
index 8fa6111..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/IOMetricGroup.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics.groups;
-
-import org.apache.flink.metrics.Counter;
-
-/**
- * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
- * forwarded to the parent task metric group.
- */
-public class IOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
-
- private final Counter numBytesOut;
- private final Counter numBytesInLocal;
- private final Counter numBytesInRemote;
-
- public IOMetricGroup(TaskMetricGroup parent) {
- super(parent);
-
- this.numBytesOut = counter("numBytesOut");
- this.numBytesInLocal = counter("numBytesInLocal");
- this.numBytesInRemote = counter("numBytesInRemote");
- }
-
- public Counter getBytesOutCounter() {
- return numBytesOut;
- }
-
- public Counter getNumBytesInLocalCounter() {
- return numBytesInLocal;
- }
-
- public Counter getNumBytesInRemoteCounter() {
- return numBytesInRemote;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
new file mode 100644
index 0000000..a726c26
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroup.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.metrics.Counter;
+
+/**
+ * Metric group that contains shareable pre-defined IO-related metrics. The metrics registration is
+ * forwarded to the parent task metric group.
+ */
+public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
+
+ private final Counter numBytesOut;
+ private final Counter numBytesInLocal;
+ private final Counter numBytesInRemote;
+
+ public TaskIOMetricGroup(TaskMetricGroup parent) {
+ super(parent);
+
+ this.numBytesOut = counter("numBytesOut");
+ this.numBytesInLocal = counter("numBytesInLocal");
+ this.numBytesInRemote = counter("numBytesInRemote");
+ }
+
+ public Counter getNumBytesOutCounter() {
+ return numBytesOut;
+ }
+
+ public Counter getNumBytesInLocalCounter() {
+ return numBytesInLocal;
+ }
+
+ public Counter getNumBytesInRemoteCounter() {
+ return numBytesInRemote;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
index 5082fd8..75b8bd8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/TaskMetricGroup.java
@@ -41,7 +41,7 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
private final Map<String, OperatorMetricGroup> operators = new HashMap<>();
- private final IOMetricGroup ioMetrics;
+ private final TaskIOMetricGroup ioMetrics;
/** The execution Id uniquely identifying the executed task represented by this metrics group */
private final AbstractID executionId;
@@ -75,7 +75,7 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
this.subtaskIndex = subtaskIndex;
this.attemptNumber = attemptNumber;
- this.ioMetrics = new IOMetricGroup(this);
+ this.ioMetrics = new TaskIOMetricGroup(this);
}
// ------------------------------------------------------------------------
@@ -109,11 +109,11 @@ public class TaskMetricGroup extends ComponentMetricGroup<TaskManagerJobMetricGr
}
/**
- * Returns the IOMetricGroup for this task.
+ * Returns the TaskIOMetricGroup for this task.
*
- * @return IOMetricGroup for this task.
+ * @return TaskIOMetricGroup for this task.
*/
- public IOMetricGroup getIOMetricGroup() {
+ public TaskIOMetricGroup getIOMetricGroup() {
return ioMetrics;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 75f2bcc..19bb67e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -277,7 +277,7 @@ public class LocalInputChannelTest {
mock(TaskEventDispatcher.class),
initialAndMaxRequestBackoff._1(),
initialAndMaxRequestBackoff._2(),
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
/**
@@ -353,7 +353,7 @@ public class LocalInputChannelTest {
subpartitionIndex,
numberOfInputChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
// Set buffer pool
inputGate.setBufferPool(bufferPool);
@@ -368,7 +368,7 @@ public class LocalInputChannelTest {
consumedPartitionIds[i],
partitionManager,
taskEventDispatcher,
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()));
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()));
}
this.numberOfInputChannels = numberOfInputChannels;
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index 9a79ff8..2c2f966 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -249,7 +249,7 @@ public class RemoteInputChannelTest {
partitionId,
mock(ConnectionID.class),
connectionManager,
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
ch.onFailedPartitionRequest();
@@ -269,7 +269,7 @@ public class RemoteInputChannelTest {
new ResultPartitionID(),
mock(ConnectionID.class),
connManager,
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
@@ -306,6 +306,6 @@ public class RemoteInputChannelTest {
connectionManager,
initialAndMaxRequestBackoff._1(),
initialAndMaxRequestBackoff._2(),
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 9e4ab86..8f9ea9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -66,7 +66,7 @@ public class SingleInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final SingleInputGate inputGate = new SingleInputGate(
- "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ "Test Task Name", new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final TestInputChannel[] inputChannels = new TestInputChannel[]{
new TestInputChannel(inputGate, 0),
@@ -113,7 +113,7 @@ public class SingleInputGateTest {
// Setup reader with one local and one unknown input channel
final IntermediateDataSetID resultId = new IntermediateDataSetID();
- final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ final SingleInputGate inputGate = new SingleInputGate("Test Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
@@ -122,12 +122,12 @@ public class SingleInputGateTest {
// Local
ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
- InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
- InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
// Set channels
inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
@@ -169,7 +169,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
@@ -182,7 +182,7 @@ public class SingleInputGateTest {
new LocalConnectionManager(),
0,
0,
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
@@ -212,7 +212,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
InputChannel unknown = new UnknownInputChannel(
inputGate,
@@ -223,7 +223,7 @@ public class SingleInputGateTest {
new LocalConnectionManager(),
0,
0,
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 867c273..3972917 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -67,7 +67,7 @@ public class TestSingleInputGate {
0,
numberOfInputChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
this.inputGate = spy(realGate);
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 466879e..cba3199 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -44,8 +44,8 @@ public class UnionInputGateTest {
public void testBasicGetNextLogic() throws Exception {
// Setup
final String testTaskName = "Test Task";
- final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
- final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+ final SingleInputGate ig1 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ final SingleInputGate ig2 = new SingleInputGate(testTaskName, new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
index b4a7400..ae87085 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
@@ -23,7 +23,7 @@ import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -67,8 +67,8 @@ public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
}
}
- public static class DummyIOMetricGroup extends IOMetricGroup {
- public DummyIOMetricGroup() {
+ public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup {
+ public DummyTaskIOMetricGroup() {
super(new UnregisteredTaskMetricsGroup());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 92b1556..aee0c70 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -215,7 +215,7 @@ public class StreamInputProcessor<IN> {
*
* @param metrics metric group
*/
- public void setMetricGroup(IOMetricGroup metrics) {
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
metrics.gauge("currentLowWatermark", new Gauge<Long>() {
@Override
public Long getValue() {
http://git-wip-us.apache.org/repos/asf/flink/blob/77258a00/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 660f07e..075d9e0 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -22,7 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
-import org.apache.flink.runtime.metrics.groups.IOMetricGroup;
+import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -281,7 +281,7 @@ public class StreamTwoInputProcessor<IN1, IN2> {
*
* @param metrics metric group
*/
- public void setMetricGroup(IOMetricGroup metrics) {
+ public void setMetricGroup(TaskIOMetricGroup metrics) {
metrics.gauge("currentLowWatermark", new Gauge<Long>() {
@Override
public Long getValue() {