You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:33 UTC

[06/10] flink git commit: [FLINK-8213][metrics] Improve fallback behaviors

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 58022a9..4062749 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -33,12 +33,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
 import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator<OUT>
 	// --------------- Metrics ---------------------------
 
 	/** Metric group for the operator. */
-	protected transient MetricGroup metrics;
+	protected transient OperatorMetricGroup metrics;
 
 	protected transient LatencyGauge latencyGauge;
 
@@ -191,7 +191,7 @@ public abstract class AbstractStreamOperator<OUT>
 			this.metrics = operatorMetricGroup;
 		} catch (Exception e) {
 			LOG.warn("An error occurred while instantiating task metrics.", e);
-			this.metrics = new UnregisteredMetricsGroup();
+			this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
 			this.output = output;
 		}
 		Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 609f8b8..0c71a53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,6 +74,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class StreamInputProcessor<IN> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
+
 	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
@@ -169,7 +175,12 @@ public class StreamInputProcessor<IN> {
 			return false;
 		}
 		if (numRecordsIn == null) {
-			numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+			try {
+				numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				numRecordsIn = new SimpleCounter();
+			}
 		}
 
 		while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 7874147..824acad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
 import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.Collection;
 
@@ -72,6 +76,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class StreamTwoInputProcessor<IN1, IN2> {
 
+	private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+
 	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
 
 	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
@@ -201,7 +207,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
 			return false;
 		}
 		if (numRecordsIn == null) {
-			numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+			try {
+				numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+			} catch (Exception e) {
+				LOG.warn("An exception occurred during the metrics setup.", e);
+				numRecordsIn = new SimpleCounter();
+			}
 		}
 
 		while (true) {

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index a44cffb..141a623 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,11 +21,13 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
@@ -426,7 +428,21 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
 				StreamStatusProvider streamStatusProvider,
 				OutputTag<T> outputTag) {
 			this.operator = operator;
-			this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+
+			{
+				Counter tmpNumRecordsIn;
+				try {
+					OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
+					ioMetricGroup.reuseInputMetricsForTask();
+					ioMetricGroup.reuseOutputMetricsForTask();
+					tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+				} catch (Exception e) {
+					LOG.warn("An exception occurred during the metrics setup.", e);
+					tmpNumRecordsIn = new SimpleCounter();
+				}
+				numRecordsIn = tmpNumRecordsIn;
+			}
+
 			this.streamStatusProvider = streamStatusProvider;
 			this.outputTag = outputTag;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index d6f5e61..2618e53 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 
 import org.junit.Test;
 
@@ -101,7 +102,7 @@ public class RichAsyncFunctionTest {
 		};
 
 		final String taskName = "foobarTask";
-		final MetricGroup metricGroup = mock(MetricGroup.class);
+		final MetricGroup metricGroup = new UnregisteredMetricsGroup();
 		final int numberOfParallelSubtasks = 42;
 		final int indexOfSubtask = 43;
 		final int attemptNumber = 1337;

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 993bffb..a556b18 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -658,7 +658,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
 
 		final Configuration taskConfiguration = new Configuration();
 		final ExecutionConfig executionConfig = new ExecutionConfig();
-		final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
+		final TaskMetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 		final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
 		final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index eacded6..0af1471 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -272,7 +272,7 @@ public class InterruptSensitiveRestoreTest {
 				new String[0]),
 			new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 			new TestingTaskManagerRuntimeInfo(),
-			new UnregisteredTaskMetricsGroup(),
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class),
 			mock(Executor.class));

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 277ca51..ee7337c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -47,8 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.query.KvStateRegistry;
@@ -363,6 +363,6 @@ public class StreamMockEnvironment implements Environment {
 
 	@Override
 	public TaskMetricGroup getMetricGroup() {
-		return new UnregisteredTaskMetricsGroup();
+		return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 5480ce7..e3e51aa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -168,7 +168,7 @@ public class StreamTaskTerminationTest extends TestLogger {
 				new String[0]),
 			mock(FileCache.class),
 			taskManagerRuntimeInfo,
-			new UnregisteredTaskMetricsGroup(),
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
 			mock(ResultPartitionConsumableNotifier.class),
 			mock(PartitionProducerStateChecker.class),
 			Executors.directExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d0ea714..8ce8b03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -58,9 +58,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
 import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -935,7 +935,7 @@ public class StreamTaskTest extends TestLogger {
 			libCache,
 			mock(FileCache.class),
 			new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),
-			new UnregisteredTaskMetricsGroup(),
+			UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
 			consumableNotifier,
 			partitionProducerStateChecker,
 			executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index d61b95d..b1127d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -244,7 +244,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
 					new String[0]),
 				new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
 				new TestingTaskManagerRuntimeInfo(),
-				new UnregisteredTaskMetricsGroup(),
+				UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
 				mock(ResultPartitionConsumableNotifier.class),
 				mock(PartitionProducerStateChecker.class),
 				Executors.directExecutor());

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 94aed2a..29516dc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -146,7 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				JobManager.class,
 				MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 7c53d52..cefadb4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -218,7 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
 				ResourceID.generate(),
 				taskManagerSystem,
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				"localhost",
 				Option.<String>empty(),
 				false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 8e97e9d..357f7af 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -282,7 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
 					ResourceID.generate(),
 					tmActorSystem[i],
 					highAvailabilityServices,
-					new NoOpMetricRegistry(),
+					NoOpMetricRegistry.INSTANCE,
 					"localhost",
 					Option.<String>empty(),
 					false,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index ecd0bea..13d6804 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -121,7 +121,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
 				TestingUtils.defaultExecutor(),
 				TestingUtils.defaultExecutor(),
 				highAvailabilityServices,
-				new NoOpMetricRegistry(),
+				NoOpMetricRegistry.INSTANCE,
 				Option.empty(),
 				JobManager.class,
 				MemoryArchivist.class)._1();

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index c86f21f..9710c20 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -122,7 +122,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 			TestingUtils.defaultExecutor(),
 			TestingUtils.defaultExecutor(),
 			highAvailabilityServices,
-			new NoOpMetricRegistry(),
+			NoOpMetricRegistry.INSTANCE,
 			Option.empty(),
 			Option.apply("jm"),
 			Option.apply("arch"),
@@ -144,7 +144,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
 			ResourceID.generate(),
 			actorSystem,
 			highAvailabilityServices,
-			new NoOpMetricRegistry(),
+			NoOpMetricRegistry.INSTANCE,
 			"localhost",
 			Option.apply("tm"),
 			true,

http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 031a4cb..39ddfa7 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -257,7 +257,7 @@ public class YarnResourceManagerTest extends TestLogger {
 				rmLeaderElectionService = new TestingLeaderElectionService();
 				highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 				heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
-				metricRegistry = new NoOpMetricRegistry();
+				metricRegistry = NoOpMetricRegistry.INSTANCE;
 				slotManager = new SlotManager(
 						new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()),
 						Time.seconds(10), Time.seconds(10), Time.minutes(1));