You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:34 UTC

[07/10] flink git commit: [FLINK-8213][metrics] Improve fallback behaviors

[FLINK-8213][metrics] Improve fallback behaviors

This closes #8213.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bf0fdc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bf0fdc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bf0fdc2

Branch: refs/heads/master
Commit: 0bf0fdc26ea86020929fa64d083dce02ba2a2ae2
Parents: 493c285
Author: zentol <ch...@apache.org>
Authored: Wed Dec 6 14:39:15 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:16 2017 +0100

----------------------------------------------------------------------
 .../kafka/internal/KafkaConsumerThreadTest.java |   4 +-
 .../flink/storm/wrappers/BoltWrapperTest.java   |   4 +-
 .../webmonitor/WebRuntimeMonitorITCase.java     |   2 +-
 .../flink/runtime/jobmaster/JobMaster.java      |   6 +-
 .../runtime/metrics/NoOpMetricRegistry.java     |  73 +++++++++
 .../groups/UnregisteredMetricGroups.java        | 164 +++++++++++++++++++
 .../flink/runtime/operators/DataSinkTask.java   |  22 ++-
 .../flink/runtime/operators/DataSourceTask.java |  25 ++-
 .../IndividualRestartsConcurrencyTest.java      |   4 +-
 .../partition/InputGateConcurrentTest.java      |  16 +-
 .../partition/InputGateFairnessTest.java        |  18 +-
 .../consumer/LocalInputChannelTest.java         |  14 +-
 .../consumer/RemoteInputChannelTest.java        |   8 +-
 .../partition/consumer/SingleInputGateTest.java |  24 +--
 .../partition/consumer/TestSingleInputGate.java |   4 +-
 .../partition/consumer/UnionInputGateTest.java  |   6 +-
 .../JobManagerHAJobGraphRecoveryITCase.java     |   2 +-
 .../jobmanager/JobManagerHARecoveryTest.java    |   7 +-
 .../runtime/jobmanager/JobManagerTest.java      |  20 +--
 .../flink/runtime/jobmanager/JobSubmitTest.java |   2 +-
 .../JobManagerLeaderElectionTest.java           |   3 +-
 .../runtime/metrics/NoOpMetricRegistry.java     |  68 --------
 .../runtime/metrics/TaskManagerMetricsTest.java |   2 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  10 +-
 .../metrics/groups/TaskIOMetricGroupTest.java   |   3 +-
 .../operators/drivers/TestTaskContext.java      |   4 +-
 .../testutils/BinaryOperatorTestBase.java       |   5 +-
 .../operators/testutils/DriverTestBase.java     |   3 +-
 .../operators/testutils/DummyEnvironment.java   |   3 +-
 .../operators/testutils/MockEnvironment.java    |   3 +-
 .../testutils/UnaryOperatorTestBase.java        |   3 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |  83 ----------
 .../resourcemanager/ResourceManagerTest.java    |   2 +-
 ...askManagerComponentsStartupShutdownTest.java |   4 +-
 .../TaskManagerProcessReapingTestBase.java      |   2 +-
 .../taskmanager/TaskManagerStartupTest.java     |   2 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java   |   4 +-
 .../api/operators/AbstractStreamOperator.java   |   6 +-
 .../runtime/io/StreamInputProcessor.java        |  13 +-
 .../runtime/io/StreamTwoInputProcessor.java     |  13 +-
 .../streaming/runtime/tasks/OperatorChain.java  |  18 +-
 .../functions/async/RichAsyncFunctionTest.java  |   3 +-
 .../operators/async/AsyncWaitOperatorTest.java  |   4 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |   4 +-
 .../runtime/tasks/StreamMockEnvironment.java    |   4 +-
 .../tasks/StreamTaskTerminationTest.java        |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   4 +-
 .../tasks/TaskCheckpointingBehaviourTest.java   |   4 +-
 ...ctTaskManagerProcessFailureRecoveryTest.java |   2 +-
 .../JobManagerHACheckpointRecoveryITCase.java   |   2 +-
 ...agerHAProcessFailureBatchRecoveryITCase.java |   2 +-
 .../recovery/ProcessFailureCancelingITCase.java |   2 +-
 .../AbstractOperatorRestoreTestBase.java        |   4 +-
 .../flink/yarn/YarnResourceManagerTest.java     |   2 +-
 54 files changed, 439 insertions(+), 281 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 2368091..383eb13 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
 
 import org.apache.flink.core.testutils.MultiShotLatch;
 import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
 import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -716,7 +716,7 @@ public class KafkaConsumerThreadTest {
 					handover,
 					new Properties(),
 					unassignedPartitionsQueue,
-					mock(MetricGroup.class),
+					new UnregisteredMetricsGroup(),
 					new KafkaConsumerCallBridge(),
 					"test-kafka-consumer-thread",
 					0,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index f518d17..98816e4 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.storm.util.AbstractTest;
 import org.apache.flink.storm.util.SplitStreamType;
@@ -372,7 +372,7 @@ public class BoltWrapperTest extends AbstractTest {
 		Environment env = mock(Environment.class);
 		when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0));
 		when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
-		when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
+		when(env.getMetricGroup()).thenReturn(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup());
 		when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
 
 		StreamTask<?, ?> mockTask = mock(StreamTask.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 75a844c..e6cfdda 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -194,7 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
 					TestingUtils.defaultExecutor(),
 					TestingUtils.defaultExecutor(),
 					highAvailabilityServices,
-					new NoOpMetricRegistry(),
+					NoOpMetricRegistry.INSTANCE,
 					Option.apply(webMonitor[i].getRestAddress()),
 					JobManager.class,
 					MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c4c4445..6d0de74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.InputSplitAssigner;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.queryablestate.KvStateID;
 import org.apache.flink.runtime.StoppingException;
 import org.apache.flink.runtime.blob.BlobServer;
@@ -78,6 +77,7 @@ import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
 import org.apache.flink.runtime.messages.webmonitor.JobDetails;
 import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateLocationRegistry;
 import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -264,8 +264,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
 			this.jobManagerMetricGroup = jobManagerMetricGroup;
 			this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
 		} else {
-			this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
-			this.jobMetricGroup = new UnregisteredMetricsGroup();
+			this.jobManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
+			this.jobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
 		}
 
 		log.info("Initializing job {} ({}).", jobName, jid);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
new file mode 100644
index 0000000..c161aa2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import javax.annotation.Nullable;
+
+/**
+ * Metric registry which does nothing.
+ */
+public class NoOpMetricRegistry implements MetricRegistry {
+	private static final char delimiter = '.';
+	private static final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
+
+	public static final MetricRegistry INSTANCE = new NoOpMetricRegistry();
+
+	private NoOpMetricRegistry() {
+	}
+
+	@Override
+	public char getDelimiter() {
+		return delimiter;
+	}
+
+	@Override
+	public char getDelimiter(int index) {
+		return delimiter;
+	}
+
+	@Override
+	public int getNumberReporters() {
+		return 0;
+	}
+
+	@Override
+	public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+	}
+
+	@Override
+	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+	}
+
+	@Override
+	public ScopeFormats getScopeFormats() {
+		return scopeFormats;
+	}
+
+	@Nullable
+	@Override
+	public String getMetricQueryServicePath() {
+		return null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
new file mode 100644
index 0000000..3869aa6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+
+/**
+ * A collection of safe drop-in replacements for existing {@link ComponentMetricGroup}s.
+ */
+public class UnregisteredMetricGroups {
+
+	private UnregisteredMetricGroups() {
+	}
+
+	public static JobManagerMetricGroup createUnregisteredJobManagerMetricGroup() {
+		return new UnregisteredJobManagerMetricGroup();
+	}
+
+	public static JobManagerJobMetricGroup createUnregisteredJobManagerJobMetricGroup() {
+		return new UnregisteredJobManagerJobMetricGroup();
+	}
+
+	public static TaskManagerMetricGroup createUnregisteredTaskManagerMetricGroup() {
+		return new UnregisteredTaskManagerMetricGroup();
+	}
+
+	public static TaskManagerJobMetricGroup createUnregisteredTaskManagerJobMetricGroup() {
+		return new UnregisteredTaskManagerJobMetricGroup();
+	}
+
+	public static TaskMetricGroup createUnregisteredTaskMetricGroup() {
+		return new UnregisteredTaskMetricGroup();
+	}
+
+	public static OperatorMetricGroup createUnregisteredOperatorMetricGroup() {
+		return new UnregisteredOperatorMetricGroup();
+	}
+
+	/**
+	 * A safe drop-in replacement for {@link JobManagerMetricGroup}s.
+	 */
+	public static class UnregisteredJobManagerMetricGroup extends JobManagerMetricGroup {
+		private static final String DEFAULT_HOST_NAME = "UnregisteredHost";
+
+		private UnregisteredJobManagerMetricGroup() {
+			super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME);
+		}
+
+		@Override
+		public JobManagerJobMetricGroup addJob(JobGraph job) {
+			return createUnregisteredJobManagerJobMetricGroup();
+		}
+	}
+
+	/**
+	 * A safe drop-in replacement for {@link JobManagerJobMetricGroup}s.
+	 */
+	public static class UnregisteredJobManagerJobMetricGroup extends JobManagerJobMetricGroup {
+		private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
+		private static final String DEFAULT_JOB_NAME = "UnregisteredJob";
+
+		protected UnregisteredJobManagerJobMetricGroup() {
+			super(NoOpMetricRegistry.INSTANCE, new UnregisteredJobManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
+		}
+	}
+
+	/**
+	 * A safe drop-in replacement for {@link TaskManagerMetricGroup}s.
+	 */
+	public static class UnregisteredTaskManagerMetricGroup extends TaskManagerMetricGroup {
+		private static final String DEFAULT_HOST_NAME = "UnregisteredHost";
+		private static final String DEFAULT_TASKMANAGER_ID = "0";
+
+		protected UnregisteredTaskManagerMetricGroup() {
+			super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_TASKMANAGER_ID);
+		}
+
+		@Override
+		public TaskMetricGroup addTaskForJob(
+			final JobID jobId,
+			final String jobName,
+			final JobVertexID jobVertexId,
+			final ExecutionAttemptID executionAttemptId,
+			final String taskName,
+			final int subtaskIndex,
+			final int attemptNumber) {
+			return createUnregisteredTaskMetricGroup();
+		}
+	}
+
+	/**
+	 * A safe drop-in replacement for {@link TaskManagerJobMetricGroup}s.
+	 */
+	public static class UnregisteredTaskManagerJobMetricGroup extends TaskManagerJobMetricGroup {
+		private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
+		private static final String DEFAULT_JOB_NAME = "UnregisteredJob";
+
+		protected UnregisteredTaskManagerJobMetricGroup() {
+			super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
+		}
+
+		@Override
+		public TaskMetricGroup addTask(
+			final JobVertexID jobVertexId,
+			final ExecutionAttemptID executionAttemptID,
+			final String taskName,
+			final int subtaskIndex,
+			final int attemptNumber) {
+			return createUnregisteredTaskMetricGroup();
+		}
+	}
+
+	/**
+	 * A safe drop-in replacement for {@link TaskMetricGroup}s.
+	 */
+	public static class UnregisteredTaskMetricGroup extends TaskMetricGroup {
+		private static final JobVertexID DEFAULT_VERTEX_ID = new JobVertexID(0, 0);
+		private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = new ExecutionAttemptID(0, 0);
+		private static final String DEFAULT_TASK_NAME = "UnregisteredTask";
+
+		protected UnregisteredTaskMetricGroup() {
+			super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerJobMetricGroup(),
+				DEFAULT_VERTEX_ID, DEFAULT_ATTEMPT_ID, DEFAULT_TASK_NAME, 0, 0);
+		}
+
+		@Override
+		public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+			return createUnregisteredOperatorMetricGroup();
+		}
+	}
+
+	/**
+	 * A safe drop-in replacement for {@link OperatorMetricGroup}s.
+	 */
+	public static class UnregisteredOperatorMetricGroup extends OperatorMetricGroup {
+		private static final OperatorID DEFAULT_OPERATOR_ID = new OperatorID(0, 0);
+		private static final String DEFAULT_OPERATOR_NAME = "UnregisteredOperator";
+
+		protected UnregisteredOperatorMetricGroup() {
+			super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskMetricGroup(), DEFAULT_OPERATOR_ID, DEFAULT_OPERATOR_NAME);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index bd052f5..bb253ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -29,12 +29,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
 import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
 import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -107,10 +109,22 @@ public class DataSinkTask<IT> extends AbstractInvokable {
 		LOG.debug(getLogString("Starting data sink operator"));
 
 		RuntimeContext ctx = createRuntimeContext();
-		final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
-		((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
-		((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
-		
+
+		final Counter numRecordsIn;
+		{
+			Counter tmpNumRecordsIn;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
+				ioMetricGroup.reuseInputMetricsForTask();
+				ioMetricGroup.reuseOutputMetricsForTask();
+				tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsIn = new SimpleCounter();
+			}
+			numRecordsIn = tmpNumRecordsIn;
+		}
+
 		if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
 			((RichOutputFormat) this.format).setRuntimeContext(ctx);
 			LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 2600b2c..1437877 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -27,12 +27,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
@@ -102,12 +104,25 @@ public class DataSourceTask<OT> extends AbstractInvokable {
 		LOG.debug(getLogString("Starting data source operator"));
 
 		RuntimeContext ctx = createRuntimeContext();
-		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
-		((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
-		Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
-		if (this.config.getNumberOfChainedStubs() == 0) {
-			((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
+
+		final Counter numRecordsOut;
+		{
+			Counter tmpNumRecordsOut;
+			try {
+				OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
+				ioMetricGroup.reuseInputMetricsForTask();
+				if (this.config.getNumberOfChainedStubs() == 0) {
+					ioMetricGroup.reuseOutputMetricsForTask();
+				}
+				tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				tmpNumRecordsOut = new SimpleCounter();
+			}
+			numRecordsOut = tmpNumRecordsOut;
 		}
+		
+		Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
 
 		if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
 			((RichInputFormat) this.format).setRuntimeContext(ctx);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index cb94d25..c977503 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
 import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.memory.MemoryStateBackend;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -344,7 +344,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
 				1,
 				allVertices,
 				checkpointCoordinatorConfiguration,
-				new UnregisteredTaskMetricsGroup()));
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
 
 		final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 64f82f3..6f98119 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -63,11 +63,11 @@ public class InputGateConcurrentTest {
 				new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 				0, numChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
 			LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
-					resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 			gate.setInputChannel(new IntermediateResultPartitionID(), channel);
 
 			partitions[i] = new PipelinedSubpartition(0, resultPartition);
@@ -99,12 +99,12 @@ public class InputGateConcurrentTest {
 				0,
 				numChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
 					gate, i, new ResultPartitionID(), mock(ConnectionID.class),
-					connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 			gate.setInputChannel(new IntermediateResultPartitionID(), channel);
 
 			sources[i] = new RemoteChannelSource(channel);
@@ -148,7 +148,7 @@ public class InputGateConcurrentTest {
 				0,
 				numChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		for (int i = 0, local = 0; i < numChannels; i++) {
 			if (localOrRemote.get(i)) {
@@ -158,14 +158,14 @@ public class InputGateConcurrentTest {
 				sources[i] = new PipelinedSubpartitionSource(psp);
 
 				LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
-						resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+						resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 				gate.setInputChannel(new IntermediateResultPartitionID(), channel);
 			}
 			else {
 				//remote channel
 				RemoteInputChannel channel = new RemoteInputChannel(
 						gate, i, new ResultPartitionID(), mock(ConnectionID.class),
-						connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+						connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 				gate.setInputChannel(new IntermediateResultPartitionID(), channel);
 
 				sources[i] = new RemoteChannelSource(channel);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index f933840..324a060 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -89,11 +89,11 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
 			LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
-					resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 			gate.setInputChannel(new IntermediateResultPartitionID(), channel);
 		}
 
@@ -142,11 +142,11 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		for (int i = 0; i < numChannels; i++) {
 			LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
-					resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 			gate.setInputChannel(new IntermediateResultPartitionID(), channel);
 		}
 
@@ -192,7 +192,7 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
@@ -201,7 +201,7 @@ public class InputGateFairnessTest {
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
 					gate, i, new ResultPartitionID(), mock(ConnectionID.class), 
-					connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 			channels[i] = channel;
 			
@@ -247,7 +247,7 @@ public class InputGateFairnessTest {
 				new IntermediateDataSetID(),
 				0, numChannels,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		final ConnectionManager connManager = createDummyConnectionManager();
 
@@ -257,7 +257,7 @@ public class InputGateFairnessTest {
 		for (int i = 0; i < numChannels; i++) {
 			RemoteInputChannel channel = new RemoteInputChannel(
 					gate, i, new ResultPartitionID(), mock(ConnectionID.class),
-					connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 			channels[i] = channel;
 			gate.setInputChannel(new IntermediateResultPartitionID(), channel);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5f7fd82..16cd90d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
 import org.apache.flink.runtime.io.network.util.TestProducerSource;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -291,7 +291,7 @@ public class LocalInputChannelTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()
 		);
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
@@ -318,7 +318,7 @@ public class LocalInputChannelTest {
 			partitionManager,
 			new TaskEventDispatcher(),
 			1, 1,
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		gate.setInputChannel(new IntermediateResultPartitionID(), channel);
 
@@ -370,7 +370,7 @@ public class LocalInputChannelTest {
 			new ResultPartitionID(),
 			partitionManager,
 			new TaskEventDispatcher(),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		channel.requestSubpartition(0);
 
@@ -411,7 +411,7 @@ public class LocalInputChannelTest {
 				mock(TaskEventDispatcher.class),
 				initialAndMaxRequestBackoff._1(),
 				initialAndMaxRequestBackoff._2(),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 	}
 
 	/**
@@ -487,7 +487,7 @@ public class LocalInputChannelTest {
 					subpartitionIndex,
 					numberOfInputChannels,
 					mock(TaskActions.class),
-					new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+					UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 			// Set buffer pool
 			inputGate.setBufferPool(bufferPool);
@@ -502,7 +502,7 @@ public class LocalInputChannelTest {
 								consumedPartitionIds[i],
 								partitionManager,
 								taskEventDispatcher,
-								new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()));
+								UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
 			}
 
 			this.numberOfInputChannels = numberOfInputChannels;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index bced9ce..d791ced 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
 import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 
 import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
 
@@ -270,7 +270,7 @@ public class RemoteInputChannelTest {
 				partitionId,
 				mock(ConnectionID.class),
 				connectionManager,
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		ch.onFailedPartitionRequest();
 
@@ -290,7 +290,7 @@ public class RemoteInputChannelTest {
 				new ResultPartitionID(),
 				mock(ConnectionID.class),
 				connManager,
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
 
@@ -401,6 +401,6 @@ public class RemoteInputChannelTest {
 			connectionManager,
 			initialAndMaxRequestBackoff._1(),
 			initialAndMaxRequestBackoff._2(),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 4d7d884..da649cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.util.TestTaskEvent;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -80,7 +80,7 @@ public class SingleInputGateTest {
 			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 			0, 2,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
 
@@ -140,7 +140,7 @@ public class SingleInputGateTest {
 				resultId, ResultPartitionType.PIPELINED,
 				0, 2,
 				mock(TaskActions.class),
-				new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 		final BufferPool bufferPool = mock(BufferPool.class);
 		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -149,12 +149,12 @@ public class SingleInputGateTest {
 		// Local
 		ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		// Unknown
 		ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
 
-		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		// Set channels
 		inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
@@ -195,7 +195,7 @@ public class SingleInputGateTest {
 			ResultPartitionType.PIPELINED,
 			0,
 			1,
-			mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
 
@@ -206,7 +206,7 @@ public class SingleInputGateTest {
 			partitionManager,
 			new TaskEventDispatcher(),
 			new LocalConnectionManager(),
-			0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 
@@ -236,7 +236,7 @@ public class SingleInputGateTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		InputChannel unknown = new UnknownInputChannel(
 			inputGate,
@@ -246,7 +246,7 @@ public class SingleInputGateTest {
 			new TaskEventDispatcher(),
 			new LocalConnectionManager(),
 			0, 0,
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
 
@@ -339,7 +339,7 @@ public class SingleInputGateTest {
 			gateDesc,
 			netEnv,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
 
@@ -388,7 +388,7 @@ public class SingleInputGateTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		RemoteInputChannel remote = mock(RemoteInputChannel.class);
 		inputGate.setInputChannel(new IntermediateResultPartitionID(), remote);
@@ -416,7 +416,7 @@ public class SingleInputGateTest {
 			0,
 			1,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		UnknownInputChannel unknown = mock(UnknownInputChannel.class);
 		final ResultPartitionID resultPartitionId = new ResultPartitionID();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 18ad490..0ae6e74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
@@ -60,7 +60,7 @@ public class TestSingleInputGate {
 			0,
 			numberOfInputChannels,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		this.inputGate = spy(realGate);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index bc1dd07..9884855 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.junit.Test;
 
@@ -48,13 +48,13 @@ public class UnionInputGateTest {
 			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 			0, 3,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 		final SingleInputGate ig2 = new SingleInputGate(
 			testTaskName, new JobID(),
 			new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
 			0, 5,
 			mock(TaskActions.class),
-			new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
 
 		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
index f5d6802..0b7547d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -203,7 +203,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
 				ResourceID.generate(),
 				taskManagerSystem,
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.<String>empty(),
 				false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 88141d6..f86e7e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -206,7 +207,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				mySubmittedJobGraphStore,
 				checkpointStateFactory,
 				jobRecoveryTimeout,
-				new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 				Option.<String>empty());
 
 			jobManager = system.actorOf(jobManagerProps);
@@ -217,7 +218,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				ResourceID.generate(),
 				system,
 				testingHighAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.apply("taskmanager"),
 				true,
@@ -381,7 +382,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
 				submittedJobGraphStore,
 				mock(CheckpointRecoveryFactory.class),
 				jobRecoveryTimeout,
-				new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
+				UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 				recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
 
 			jobManager = system.actorOf(jobManagerProps);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 6a02d1f..51cc469 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -626,7 +626,7 @@ public class JobManagerTest extends TestLogger {
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			highAvailabilityServices,
-			new NoOpMetricRegistry(),
+			NoOpMetricRegistry.INSTANCE,
 			Option.empty(),
 			TestingJobManager.class,
 			MemoryArchivist.class)._1();
@@ -648,7 +648,7 @@ public class JobManagerTest extends TestLogger {
 			ResourceID.generate(),
 			system,
 			highAvailabilityServices,
-			new NoOpMetricRegistry(),
+			NoOpMetricRegistry.INSTANCE,
 			"localhost",
 			scala.Option.<String>empty(),
 			true,
@@ -845,7 +845,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -864,7 +864,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.apply("tm"),
 				true,
@@ -1057,7 +1057,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -1076,7 +1076,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.apply("tm"),
 				true,
@@ -1172,7 +1172,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -1191,7 +1191,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.apply("tm"),
 				true,
@@ -1285,7 +1285,7 @@ public class JobManagerTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				Option.apply("jm"),
 				Option.apply("arch"),
@@ -1307,7 +1307,7 @@ public class JobManagerTest extends TestLogger {
 				ResourceID.generate(),
 				actorSystem,
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.apply("tm"),
 				true,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index cc93f18..0ca83ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -95,7 +95,7 @@ public class JobSubmitTest {
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			highAvailabilityServices,
-			new NoOpMetricRegistry(),
+			NoOpMetricRegistry.INSTANCE,
 			Option.empty(),
 			JobManager.class,
 			MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 72c03af..703cd0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
 import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
 import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
 import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
 import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -201,7 +202,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
 			submittedJobGraphStore,
 			checkpointRecoveryFactory,
 			AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
-			new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
+			UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
 			Option.<String>empty());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
deleted file mode 100644
index 46d6548..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-
-import javax.annotation.Nullable;
-
-/**
- * Metric registry which does nothing and is intended for testing purposes.
- */
-public class NoOpMetricRegistry implements MetricRegistry {
-
-	final char delimiter = ',';
-
-	final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
-
-	@Override
-	public char getDelimiter() {
-		return delimiter;
-	}
-
-	@Override
-	public char getDelimiter(int index) {
-		return delimiter;
-	}
-
-	@Override
-	public int getNumberReporters() {
-		return 0;
-	}
-
-	@Override
-	public void register(Metric metric, String metricName, AbstractMetricGroup group) {}
-
-	@Override
-	public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {}
-
-	@Override
-	public ScopeFormats getScopeFormats() {
-		return scopeFormats;
-	}
-
-	@Nullable
-	@Override
-	public String getMetricQueryServicePath() {
-		return null;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index d934ea9..eec7165 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -78,7 +78,7 @@ public class TaskManagerMetricsTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				JobManager.class,
 				MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 0fced33..4dc5edf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -86,7 +86,7 @@ public class MetricGroupTest extends TestLogger {
 	 */
 	@Test
 	public void testUserDefinedVariable() {
-		MetricRegistry registry = new NoOpMetricRegistry();
+		MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
 		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
 
 		String key = "key";
@@ -111,7 +111,7 @@ public class MetricGroupTest extends TestLogger {
 	 */
 	@Test
 	public void testUserDefinedVariableOnKeyGroup() {
-		MetricRegistry registry = new NoOpMetricRegistry();
+		MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
 		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
 
 		String key1 = "key1";
@@ -142,7 +142,7 @@ public class MetricGroupTest extends TestLogger {
 	 */
 	@Test
 	public void testNameCollisionForKeyAfterGenericGroup() {
-		MetricRegistry registry = new NoOpMetricRegistry();
+		MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
 		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
 
 		String key = "key";
@@ -169,7 +169,7 @@ public class MetricGroupTest extends TestLogger {
 	 */
 	@Test
 	public void testNameCollisionForKeyAndValueAfterGenericGroup() {
-		MetricRegistry registry = new NoOpMetricRegistry();
+		MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
 		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
 
 		String key = "key";
@@ -195,7 +195,7 @@ public class MetricGroupTest extends TestLogger {
 	 */
 	@Test
 	public void testNameCollisionAfterKeyValueGroup() {
-		MetricRegistry registry = new NoOpMetricRegistry();
+		MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
 		GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
 
 		String key = "key";

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index bcf77de..f23b2f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 
 import org.junit.Test;
 
@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNotNull;
 public class TaskIOMetricGroupTest {
 	@Test
 	public void testTaskIOMetricGroup() {
-		TaskMetricGroup task = new UnregisteredTaskMetricsGroup();
+		TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 		TaskIOMetricGroup taskIO = task.getIOMetricGroup();
 
 		// test counter forwarding

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 4fa74b3..a4d14c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -29,10 +29,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.DriverStrategy;
 import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -227,6 +227,6 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> {
 
 	@Override
 	public OperatorMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+		return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 96c8b73..a76f110 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.TaskContext;
@@ -373,7 +372,7 @@ public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT> extend
 	
 	@Override
 	public OperatorMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+		return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index d2cedb9..3820bf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.ResettableDriver;
 import org.apache.flink.runtime.operators.TaskContext;
@@ -368,7 +369,7 @@ public abstract class DriverTestBase<S extends Function> extends TestLogger impl
 	
 	@Override
 	public OperatorMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+		return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 718ecfe..148eb0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -100,7 +101,7 @@ public class DummyEnvironment implements Environment {
 
 	@Override
 	public TaskMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup();
+		return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index f655b12..861cf35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -281,7 +282,7 @@ public class MockEnvironment implements Environment {
 
 	@Override
 	public TaskMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup();
+		return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 141aec6..2ef82da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.Driver;
 import org.apache.flink.runtime.operators.TaskContext;
 import org.apache.flink.runtime.operators.ResettableDriver;
@@ -364,7 +365,7 @@ public abstract class UnaryOperatorTestBase<S extends Function, IN, OUT> extends
 	
 	@Override
 	public OperatorMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+		return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
deleted file mode 100644
index 7065e6b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-
-import java.util.UUID;
-
-public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
-	
-	private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry();
-
-	
-	public UnregisteredTaskMetricsGroup() {
-		super(EMPTY_REGISTRY, new DummyJobMetricGroup(),
-				new JobVertexID(), new ExecutionAttemptID(), "testtask", 0, 0);
-	}
-
-	@Override
-	protected void addMetric(String name, Metric metric) {}
-
-	@Override
-	public MetricGroup addGroup(String name) {
-		return new UnregisteredMetricsGroup();
-	}
-
-	// ------------------------------------------------------------------------
-	
-	private static class DummyTaskManagerMetricsGroup extends TaskManagerMetricGroup {
-
-		public DummyTaskManagerMetricsGroup() {
-			super(EMPTY_REGISTRY, "localhost", UUID.randomUUID().toString());
-		}
-	}
-
-	private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup {
-		
-		public DummyJobMetricGroup() {
-			super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
-		}
-	}
-	
-	public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup {
-		public DummyTaskIOMetricGroup() {
-			super(new UnregisteredTaskMetricsGroup());
-		}
-	}
-
-	public static class DummyOperatorMetricGroup extends OperatorMetricGroup {
-		public DummyOperatorMetricGroup() {
-			super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), new OperatorID(), "testoperator");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 3050718..87cb4a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -97,7 +97,7 @@ public class ResourceManagerTest extends TestLogger {
 			highAvailabilityServices,
 			new HeartbeatServices(1000L, 10000L),
 			slotManager,
-			new NoOpMetricRegistry(),
+			NoOpMetricRegistry.INSTANCE,
 			jobLeaderIdService,
 			testingFatalErrorHandler);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 98b5b8b..a3c41c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -98,7 +98,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				JobManager.class,
 				MemoryArchivist.class)._1();
@@ -168,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
 				network,
 				numberOfSlots,
 				highAvailabilityServices,
-				new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
+				new TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
 
 			taskManager = actorSystem.actorOf(tmProps);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 7429ec5..fadebce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -122,7 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				JobManager.class,
 				MemoryArchivist.class)._1;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index ed06dc0..4c7a8cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -250,7 +250,7 @@ public class TaskManagerStartupTest extends TestLogger {
 				ResourceID.generate(),
 				null,
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.<String>empty(),
 				false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 38238cd..086ad71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -193,7 +193,7 @@ public class JvmExitOnFatalErrorTest {
 							new String[0]),
 						new FileCache(tmInfo.getTmpDirectories()),
 						tmInfo,
-						new UnregisteredTaskMetricsGroup(),
+						UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
 						new NoOpResultPartitionConsumableNotifier(),
 						new NoOpPartitionProducerStateChecker(),
 						executor);