You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:33 UTC
[06/10] flink git commit: [FLINK-8213][metrics] Improve fallback
behaviors
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index 58022a9..4062749 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -33,12 +33,12 @@ import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointOptions.CheckpointType;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
@@ -155,7 +155,7 @@ public abstract class AbstractStreamOperator<OUT>
// --------------- Metrics ---------------------------
/** Metric group for the operator. */
- protected transient MetricGroup metrics;
+ protected transient OperatorMetricGroup metrics;
protected transient LatencyGauge latencyGauge;
@@ -191,7 +191,7 @@ public abstract class AbstractStreamOperator<OUT>
this.metrics = operatorMetricGroup;
} catch (Exception e) {
LOG.warn("An error occurred while instantiating task metrics.", e);
- this.metrics = new UnregisteredMetricsGroup();
+ this.metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
this.output = output;
}
Configuration taskManagerConfig = container.getEnvironment().getTaskManagerInfo().getConfiguration();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index 609f8b8..0c71a53 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,6 +74,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class StreamInputProcessor<IN> {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamInputProcessor.class);
+
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
@@ -169,7 +175,12 @@ public class StreamInputProcessor<IN> {
return false;
}
if (numRecordsIn == null) {
- numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+ try {
+ numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+ } catch (Exception e) {
+ LOG.warn("An exception occurred during the metrics setup.", e);
+ numRecordsIn = new SimpleCounter();
+ }
}
while (true) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index 7874147..824acad 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -49,6 +50,9 @@ import org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatus;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.io.IOException;
import java.util.Collection;
@@ -72,6 +76,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
@Internal
public class StreamTwoInputProcessor<IN1, IN2> {
+ private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
+
private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
@@ -201,7 +207,12 @@ public class StreamTwoInputProcessor<IN1, IN2> {
return false;
}
if (numRecordsIn == null) {
- numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+ try {
+ numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+ } catch (Exception e) {
+ LOG.warn("An exception occurred during the metrics setup.", e);
+ numRecordsIn = new SimpleCounter();
+ }
}
while (true) {
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index a44cffb..141a623 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -21,11 +21,13 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.streaming.api.collector.selector.CopyingDirectedOutput;
@@ -426,7 +428,21 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>> implements Strea
StreamStatusProvider streamStatusProvider,
OutputTag<T> outputTag) {
this.operator = operator;
- this.numRecordsIn = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+
+ {
+ Counter tmpNumRecordsIn;
+ try {
+ OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) operator.getMetricGroup()).getIOMetricGroup();
+ ioMetricGroup.reuseInputMetricsForTask();
+ ioMetricGroup.reuseOutputMetricsForTask();
+ tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+ } catch (Exception e) {
+ LOG.warn("An exception occurred during the metrics setup.", e);
+ tmpNumRecordsIn = new SimpleCounter();
+ }
+ numRecordsIn = tmpNumRecordsIn;
+ }
+
this.streamStatusProvider = streamStatusProvider;
this.outputTag = outputTag;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
index d6f5e61..2618e53 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunctionTest.java
@@ -33,6 +33,7 @@ import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.junit.Test;
@@ -101,7 +102,7 @@ public class RichAsyncFunctionTest {
};
final String taskName = "foobarTask";
- final MetricGroup metricGroup = mock(MetricGroup.class);
+ final MetricGroup metricGroup = new UnregisteredMetricsGroup();
final int numberOfParallelSubtasks = 42;
final int indexOfSubtask = 43;
final int attemptNumber = 1337;
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 993bffb..a556b18 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -35,8 +35,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -658,7 +658,7 @@ public class AsyncWaitOperatorTest extends TestLogger {
final Configuration taskConfiguration = new Configuration();
final ExecutionConfig executionConfig = new ExecutionConfig();
- final TaskMetricGroup metricGroup = new UnregisteredTaskMetricsGroup();
+ final TaskMetricGroup metricGroup = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
final TaskManagerRuntimeInfo taskManagerRuntimeInfo = new TestingTaskManagerRuntimeInfo();
final TaskInfo taskInfo = new TaskInfo("foobarTask", 1, 0, 1, 1);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index eacded6..0af1471 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
@@ -272,7 +272,7 @@ public class InterruptSensitiveRestoreTest {
new String[0]),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
- new UnregisteredTaskMetricsGroup(),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class),
mock(Executor.class));
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 277ca51..ee7337c 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -47,8 +47,8 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.query.KvStateRegistry;
@@ -363,6 +363,6 @@ public class StreamMockEnvironment implements Environment {
@Override
public TaskMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup();
+ return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index 5480ce7..e3e51aa 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -49,7 +49,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -168,7 +168,7 @@ public class StreamTaskTerminationTest extends TestLogger {
new String[0]),
mock(FileCache.class),
taskManagerRuntimeInfo,
- new UnregisteredTaskMetricsGroup(),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class),
Executors.directExecutor());
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index d0ea714..8ce8b03 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -58,9 +58,9 @@ import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
@@ -935,7 +935,7 @@ public class StreamTaskTest extends TestLogger {
libCache,
mock(FileCache.class),
new TestingTaskManagerRuntimeInfo(taskManagerConfig, new String[] {System.getProperty("java.io.tmpdir")}),
- new UnregisteredTaskMetricsGroup(),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
consumableNotifier,
partitionProducerStateChecker,
executor);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index d61b95d..b1127d5 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
@@ -244,7 +244,7 @@ public class TaskCheckpointingBehaviourTest extends TestLogger {
new String[0]),
new FileCache(new String[] { EnvironmentInformation.getTemporaryFileDirectory() }),
new TestingTaskManagerRuntimeInfo(),
- new UnregisteredTaskMetricsGroup(),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
mock(ResultPartitionConsumableNotifier.class),
mock(PartitionProducerStateChecker.class),
Executors.directExecutor());
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
index 94aed2a..29516dc 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/AbstractTaskManagerProcessFailureRecoveryTest.java
@@ -146,7 +146,7 @@ public abstract class AbstractTaskManagerProcessFailureRecoveryTest extends Test
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
index 7c53d52..cefadb4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHACheckpointRecoveryITCase.java
@@ -218,7 +218,7 @@ public class JobManagerHACheckpointRecoveryITCase extends TestLogger {
ResourceID.generate(),
taskManagerSystem,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
index 8e97e9d..357f7af 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/JobManagerHAProcessFailureBatchRecoveryITCase.java
@@ -282,7 +282,7 @@ public class JobManagerHAProcessFailureBatchRecoveryITCase extends TestLogger {
ResourceID.generate(),
tmActorSystem[i],
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
index ecd0bea..13d6804 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/recovery/ProcessFailureCancelingITCase.java
@@ -121,7 +121,7 @@ public class ProcessFailureCancelingITCase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
index c86f21f..9710c20 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/state/operator/restore/AbstractOperatorRestoreTestBase.java
@@ -122,7 +122,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -144,7 +144,7 @@ public abstract class AbstractOperatorRestoreTestBase extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
index 031a4cb..39ddfa7 100644
--- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
+++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
@@ -257,7 +257,7 @@ public class YarnResourceManagerTest extends TestLogger {
rmLeaderElectionService = new TestingLeaderElectionService();
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
heartbeatServices = new TestingHeartbeatServices(5L, 5L, scheduledExecutor);
- metricRegistry = new NoOpMetricRegistry();
+ metricRegistry = NoOpMetricRegistry.INSTANCE;
slotManager = new SlotManager(
new ScheduledExecutorServiceAdapter(new DirectScheduledExecutorService()),
Time.seconds(10), Time.seconds(10), Time.minutes(1));