You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/12/12 19:27:34 UTC
[07/10] flink git commit: [FLINK-8213][metrics] Improve fallback
behaviors
[FLINK-8213][metrics] Improve fallback behaviors
This closes #8213.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0bf0fdc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0bf0fdc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0bf0fdc2
Branch: refs/heads/master
Commit: 0bf0fdc26ea86020929fa64d083dce02ba2a2ae2
Parents: 493c285
Author: zentol <ch...@apache.org>
Authored: Wed Dec 6 14:39:15 2017 +0100
Committer: zentol <ch...@apache.org>
Committed: Tue Dec 12 19:09:16 2017 +0100
----------------------------------------------------------------------
.../kafka/internal/KafkaConsumerThreadTest.java | 4 +-
.../flink/storm/wrappers/BoltWrapperTest.java | 4 +-
.../webmonitor/WebRuntimeMonitorITCase.java | 2 +-
.../flink/runtime/jobmaster/JobMaster.java | 6 +-
.../runtime/metrics/NoOpMetricRegistry.java | 73 +++++++++
.../groups/UnregisteredMetricGroups.java | 164 +++++++++++++++++++
.../flink/runtime/operators/DataSinkTask.java | 22 ++-
.../flink/runtime/operators/DataSourceTask.java | 25 ++-
.../IndividualRestartsConcurrencyTest.java | 4 +-
.../partition/InputGateConcurrentTest.java | 16 +-
.../partition/InputGateFairnessTest.java | 18 +-
.../consumer/LocalInputChannelTest.java | 14 +-
.../consumer/RemoteInputChannelTest.java | 8 +-
.../partition/consumer/SingleInputGateTest.java | 24 +--
.../partition/consumer/TestSingleInputGate.java | 4 +-
.../partition/consumer/UnionInputGateTest.java | 6 +-
.../JobManagerHAJobGraphRecoveryITCase.java | 2 +-
.../jobmanager/JobManagerHARecoveryTest.java | 7 +-
.../runtime/jobmanager/JobManagerTest.java | 20 +--
.../flink/runtime/jobmanager/JobSubmitTest.java | 2 +-
.../JobManagerLeaderElectionTest.java | 3 +-
.../runtime/metrics/NoOpMetricRegistry.java | 68 --------
.../runtime/metrics/TaskManagerMetricsTest.java | 2 +-
.../runtime/metrics/groups/MetricGroupTest.java | 10 +-
.../metrics/groups/TaskIOMetricGroupTest.java | 3 +-
.../operators/drivers/TestTaskContext.java | 4 +-
.../testutils/BinaryOperatorTestBase.java | 5 +-
.../operators/testutils/DriverTestBase.java | 3 +-
.../operators/testutils/DummyEnvironment.java | 3 +-
.../operators/testutils/MockEnvironment.java | 3 +-
.../testutils/UnaryOperatorTestBase.java | 3 +-
.../testutils/UnregisteredTaskMetricsGroup.java | 83 ----------
.../resourcemanager/ResourceManagerTest.java | 2 +-
...askManagerComponentsStartupShutdownTest.java | 4 +-
.../TaskManagerProcessReapingTestBase.java | 2 +-
.../taskmanager/TaskManagerStartupTest.java | 2 +-
.../runtime/util/JvmExitOnFatalErrorTest.java | 4 +-
.../api/operators/AbstractStreamOperator.java | 6 +-
.../runtime/io/StreamInputProcessor.java | 13 +-
.../runtime/io/StreamTwoInputProcessor.java | 13 +-
.../streaming/runtime/tasks/OperatorChain.java | 18 +-
.../functions/async/RichAsyncFunctionTest.java | 3 +-
.../operators/async/AsyncWaitOperatorTest.java | 4 +-
.../tasks/InterruptSensitiveRestoreTest.java | 4 +-
.../runtime/tasks/StreamMockEnvironment.java | 4 +-
.../tasks/StreamTaskTerminationTest.java | 4 +-
.../streaming/runtime/tasks/StreamTaskTest.java | 4 +-
.../tasks/TaskCheckpointingBehaviourTest.java | 4 +-
...ctTaskManagerProcessFailureRecoveryTest.java | 2 +-
.../JobManagerHACheckpointRecoveryITCase.java | 2 +-
...agerHAProcessFailureBatchRecoveryITCase.java | 2 +-
.../recovery/ProcessFailureCancelingITCase.java | 2 +-
.../AbstractOperatorRestoreTestBase.java | 4 +-
.../flink/yarn/YarnResourceManagerTest.java | 2 +-
54 files changed, 439 insertions(+), 281 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
index 2368091..383eb13 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/KafkaConsumerThreadTest.java
@@ -20,7 +20,7 @@ package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.core.testutils.MultiShotLatch;
import org.apache.flink.core.testutils.OneShotLatch;
-import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
@@ -716,7 +716,7 @@ public class KafkaConsumerThreadTest {
handover,
new Properties(),
unassignedPartitionsQueue,
- mock(MetricGroup.class),
+ new UnregisteredMetricsGroup(),
new KafkaConsumerCallBridge(),
"test-kafka-consumer-thread",
0,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
index f518d17..98816e4 100644
--- a/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
+++ b/flink-contrib/flink-storm/src/test/java/org/apache/flink/storm/wrappers/BoltWrapperTest.java
@@ -25,7 +25,7 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.storm.util.AbstractTest;
import org.apache.flink.storm.util.SplitStreamType;
@@ -372,7 +372,7 @@ public class BoltWrapperTest extends AbstractTest {
Environment env = mock(Environment.class);
when(env.getTaskInfo()).thenReturn(new TaskInfo("Mock Task", 1, 0, 1, 0));
when(env.getUserClassLoader()).thenReturn(BoltWrapperTest.class.getClassLoader());
- when(env.getMetricGroup()).thenReturn(new UnregisteredTaskMetricsGroup());
+ when(env.getMetricGroup()).thenReturn(UnregisteredMetricGroups.createUnregisteredTaskMetricGroup());
when(env.getTaskManagerInfo()).thenReturn(new TestingTaskManagerRuntimeInfo());
StreamTask<?, ?> mockTask = mock(StreamTask.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
index 75a844c..e6cfdda 100644
--- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
+++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java
@@ -194,7 +194,7 @@ public class WebRuntimeMonitorITCase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.apply(webMonitor[i].getRestAddress()),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index c4c4445..6d0de74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.queryablestate.KvStateID;
import org.apache.flink.runtime.StoppingException;
import org.apache.flink.runtime.blob.BlobServer;
@@ -78,6 +77,7 @@ import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateLocationRegistry;
import org.apache.flink.runtime.query.UnknownKvStateLocation;
@@ -264,8 +264,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast
this.jobManagerMetricGroup = jobManagerMetricGroup;
this.jobMetricGroup = jobManagerMetricGroup.addJob(jobGraph);
} else {
- this.jobManagerMetricGroup = new UnregisteredMetricsGroup();
- this.jobMetricGroup = new UnregisteredMetricsGroup();
+ this.jobManagerMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup();
+ this.jobMetricGroup = UnregisteredMetricGroups.createUnregisteredJobManagerJobMetricGroup();
}
log.info("Initializing job {} ({}).", jobName, jid);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
new file mode 100644
index 0000000..c161aa2
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+import javax.annotation.Nullable;
+
+/**
+ * Metric registry which does nothing.
+ */
+public class NoOpMetricRegistry implements MetricRegistry {
+ private static final char delimiter = '.';
+ private static final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
+
+ public static final MetricRegistry INSTANCE = new NoOpMetricRegistry();
+
+ private NoOpMetricRegistry() {
+ }
+
+ @Override
+ public char getDelimiter() {
+ return delimiter;
+ }
+
+ @Override
+ public char getDelimiter(int index) {
+ return delimiter;
+ }
+
+ @Override
+ public int getNumberReporters() {
+ return 0;
+ }
+
+ @Override
+ public void register(Metric metric, String metricName, AbstractMetricGroup group) {
+ }
+
+ @Override
+ public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {
+ }
+
+ @Override
+ public ScopeFormats getScopeFormats() {
+ return scopeFormats;
+ }
+
+ @Nullable
+ @Override
+ public String getMetricQueryServicePath() {
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
new file mode 100644
index 0000000..3869aa6
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/metrics/groups/UnregisteredMetricGroups.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics.groups;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+
+/**
+ * A collection of safe drop-in replacements for existing {@link ComponentMetricGroup}s.
+ */
+public class UnregisteredMetricGroups {
+
+ private UnregisteredMetricGroups() {
+ }
+
+ public static JobManagerMetricGroup createUnregisteredJobManagerMetricGroup() {
+ return new UnregisteredJobManagerMetricGroup();
+ }
+
+ public static JobManagerJobMetricGroup createUnregisteredJobManagerJobMetricGroup() {
+ return new UnregisteredJobManagerJobMetricGroup();
+ }
+
+ public static TaskManagerMetricGroup createUnregisteredTaskManagerMetricGroup() {
+ return new UnregisteredTaskManagerMetricGroup();
+ }
+
+ public static TaskManagerJobMetricGroup createUnregisteredTaskManagerJobMetricGroup() {
+ return new UnregisteredTaskManagerJobMetricGroup();
+ }
+
+ public static TaskMetricGroup createUnregisteredTaskMetricGroup() {
+ return new UnregisteredTaskMetricGroup();
+ }
+
+ public static OperatorMetricGroup createUnregisteredOperatorMetricGroup() {
+ return new UnregisteredOperatorMetricGroup();
+ }
+
+ /**
+ * A safe drop-in replacement for {@link JobManagerMetricGroup}s.
+ */
+ public static class UnregisteredJobManagerMetricGroup extends JobManagerMetricGroup {
+ private static final String DEFAULT_HOST_NAME = "UnregisteredHost";
+
+ private UnregisteredJobManagerMetricGroup() {
+ super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME);
+ }
+
+ @Override
+ public JobManagerJobMetricGroup addJob(JobGraph job) {
+ return createUnregisteredJobManagerJobMetricGroup();
+ }
+ }
+
+ /**
+ * A safe drop-in replacement for {@link JobManagerJobMetricGroup}s.
+ */
+ public static class UnregisteredJobManagerJobMetricGroup extends JobManagerJobMetricGroup {
+ private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
+ private static final String DEFAULT_JOB_NAME = "UnregisteredJob";
+
+ protected UnregisteredJobManagerJobMetricGroup() {
+ super(NoOpMetricRegistry.INSTANCE, new UnregisteredJobManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
+ }
+ }
+
+ /**
+ * A safe drop-in replacement for {@link TaskManagerMetricGroup}s.
+ */
+ public static class UnregisteredTaskManagerMetricGroup extends TaskManagerMetricGroup {
+ private static final String DEFAULT_HOST_NAME = "UnregisteredHost";
+ private static final String DEFAULT_TASKMANAGER_ID = "0";
+
+ protected UnregisteredTaskManagerMetricGroup() {
+ super(NoOpMetricRegistry.INSTANCE, DEFAULT_HOST_NAME, DEFAULT_TASKMANAGER_ID);
+ }
+
+ @Override
+ public TaskMetricGroup addTaskForJob(
+ final JobID jobId,
+ final String jobName,
+ final JobVertexID jobVertexId,
+ final ExecutionAttemptID executionAttemptId,
+ final String taskName,
+ final int subtaskIndex,
+ final int attemptNumber) {
+ return createUnregisteredTaskMetricGroup();
+ }
+ }
+
+ /**
+ * A safe drop-in replacement for {@link TaskManagerJobMetricGroup}s.
+ */
+ public static class UnregisteredTaskManagerJobMetricGroup extends TaskManagerJobMetricGroup {
+ private static final JobID DEFAULT_JOB_ID = new JobID(0, 0);
+ private static final String DEFAULT_JOB_NAME = "UnregisteredJob";
+
+ protected UnregisteredTaskManagerJobMetricGroup() {
+ super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerMetricGroup(), DEFAULT_JOB_ID, DEFAULT_JOB_NAME);
+ }
+
+ @Override
+ public TaskMetricGroup addTask(
+ final JobVertexID jobVertexId,
+ final ExecutionAttemptID executionAttemptID,
+ final String taskName,
+ final int subtaskIndex,
+ final int attemptNumber) {
+ return createUnregisteredTaskMetricGroup();
+ }
+ }
+
+ /**
+ * A safe drop-in replacement for {@link TaskMetricGroup}s.
+ */
+ public static class UnregisteredTaskMetricGroup extends TaskMetricGroup {
+ private static final JobVertexID DEFAULT_VERTEX_ID = new JobVertexID(0, 0);
+ private static final ExecutionAttemptID DEFAULT_ATTEMPT_ID = new ExecutionAttemptID(0, 0);
+ private static final String DEFAULT_TASK_NAME = "UnregisteredTask";
+
+ protected UnregisteredTaskMetricGroup() {
+ super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskManagerJobMetricGroup(),
+ DEFAULT_VERTEX_ID, DEFAULT_ATTEMPT_ID, DEFAULT_TASK_NAME, 0, 0);
+ }
+
+ @Override
+ public OperatorMetricGroup addOperator(OperatorID operatorID, String name) {
+ return createUnregisteredOperatorMetricGroup();
+ }
+ }
+
+ /**
+ * A safe drop-in replacement for {@link OperatorMetricGroup}s.
+ */
+ public static class UnregisteredOperatorMetricGroup extends OperatorMetricGroup {
+ private static final OperatorID DEFAULT_OPERATOR_ID = new OperatorID(0, 0);
+ private static final String DEFAULT_OPERATOR_NAME = "UnregisteredOperator";
+
+ protected UnregisteredOperatorMetricGroup() {
+ super(NoOpMetricRegistry.INSTANCE, new UnregisteredTaskMetricGroup(), DEFAULT_OPERATOR_ID, DEFAULT_OPERATOR_NAME);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
index bd052f5..bb253ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSinkTask.java
@@ -29,12 +29,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.IOReadableWritable;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.reader.MutableReader;
import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
import org.apache.flink.runtime.operators.sort.UnilateralSortMerger;
@@ -107,10 +109,22 @@ public class DataSinkTask<IT> extends AbstractInvokable {
LOG.debug(getLogString("Starting data sink operator"));
RuntimeContext ctx = createRuntimeContext();
- final Counter numRecordsIn = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
- ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
- ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
-
+
+ final Counter numRecordsIn;
+ {
+ Counter tmpNumRecordsIn;
+ try {
+ OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
+ ioMetricGroup.reuseInputMetricsForTask();
+ ioMetricGroup.reuseOutputMetricsForTask();
+ tmpNumRecordsIn = ioMetricGroup.getNumRecordsInCounter();
+ } catch (Exception e) {
+ LOG.warn("An exception occurred during the metrics setup.", e);
+ tmpNumRecordsIn = new SimpleCounter();
+ }
+ numRecordsIn = tmpNumRecordsIn;
+ }
+
if(RichOutputFormat.class.isAssignableFrom(this.format.getClass())){
((RichOutputFormat) this.format).setRuntimeContext(ctx);
LOG.debug(getLogString("Rich Sink detected. Initializing runtime context."));
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
index 2600b2c..1437877 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DataSourceTask.java
@@ -27,12 +27,14 @@ import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProviderException;
+import org.apache.flink.runtime.metrics.groups.OperatorIOMetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.operators.chaining.ChainedDriver;
import org.apache.flink.runtime.operators.chaining.ExceptionInChainedStubException;
@@ -102,12 +104,25 @@ public class DataSourceTask<OT> extends AbstractInvokable {
LOG.debug(getLogString("Starting data source operator"));
RuntimeContext ctx = createRuntimeContext();
- Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
- ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseInputMetricsForTask();
- Counter numRecordsOut = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().getNumRecordsOutCounter();
- if (this.config.getNumberOfChainedStubs() == 0) {
- ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup().reuseOutputMetricsForTask();
+
+ final Counter numRecordsOut;
+ {
+ Counter tmpNumRecordsOut;
+ try {
+ OperatorIOMetricGroup ioMetricGroup = ((OperatorMetricGroup) ctx.getMetricGroup()).getIOMetricGroup();
+ ioMetricGroup.reuseInputMetricsForTask();
+ if (this.config.getNumberOfChainedStubs() == 0) {
+ ioMetricGroup.reuseOutputMetricsForTask();
+ }
+ tmpNumRecordsOut = ioMetricGroup.getNumRecordsOutCounter();
+ } catch (Exception e) {
+ LOG.warn("An exception occurred during the metrics setup.", e);
+ tmpNumRecordsOut = new SimpleCounter();
+ }
+ numRecordsOut = tmpNumRecordsOut;
}
+
+ Counter completedSplitsCounter = ctx.getMetricGroup().counter("numSplitsProcessed");
if (RichInputFormat.class.isAssignableFrom(this.format.getClass())) {
((RichInputFormat) this.format).setRuntimeContext(ctx);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
index cb94d25..c977503 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/IndividualRestartsConcurrencyTest.java
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings;
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
@@ -344,7 +344,7 @@ public class IndividualRestartsConcurrencyTest extends TestLogger {
1,
allVertices,
checkpointCoordinatorConfiguration,
- new UnregisteredTaskMetricsGroup()));
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()));
final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 64f82f3..6f98119 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel
import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
@@ -63,11 +63,11 @@ public class InputGateConcurrentTest {
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, numChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
- resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
partitions[i] = new PipelinedSubpartition(0, resultPartition);
@@ -99,12 +99,12 @@ public class InputGateConcurrentTest {
0,
numChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
- connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
sources[i] = new RemoteChannelSource(channel);
@@ -148,7 +148,7 @@ public class InputGateConcurrentTest {
0,
numChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0, local = 0; i < numChannels; i++) {
if (localOrRemote.get(i)) {
@@ -158,14 +158,14 @@ public class InputGateConcurrentTest {
sources[i] = new PipelinedSubpartitionSource(psp);
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
- resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
}
else {
//remote channel
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
- connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
sources[i] = new RemoteChannelSource(channel);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index f933840..324a060 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
@@ -89,11 +89,11 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
- resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
}
@@ -142,11 +142,11 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
for (int i = 0; i < numChannels; i++) {
LocalInputChannel channel = new LocalInputChannel(gate, i, new ResultPartitionID(),
- resultPartitionManager, mock(TaskEventDispatcher.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ resultPartitionManager, mock(TaskEventDispatcher.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
}
@@ -192,7 +192,7 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final ConnectionManager connManager = createDummyConnectionManager();
@@ -201,7 +201,7 @@ public class InputGateFairnessTest {
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
- connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
channels[i] = channel;
@@ -247,7 +247,7 @@ public class InputGateFairnessTest {
new IntermediateDataSetID(),
0, numChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final ConnectionManager connManager = createDummyConnectionManager();
@@ -257,7 +257,7 @@ public class InputGateFairnessTest {
for (int i = 0; i < numChannels; i++) {
RemoteInputChannel channel = new RemoteInputChannel(
gate, i, new ResultPartitionID(), mock(ConnectionID.class),
- connManager, 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ connManager, 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
channels[i] = channel;
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5f7fd82..16cd90d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -38,7 +38,7 @@ import org.apache.flink.runtime.io.network.util.TestPartitionProducer;
import org.apache.flink.runtime.io.network.util.TestProducerSource;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -291,7 +291,7 @@ public class LocalInputChannelTest {
0,
1,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()
);
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
@@ -318,7 +318,7 @@ public class LocalInputChannelTest {
partitionManager,
new TaskEventDispatcher(),
1, 1,
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
gate.setInputChannel(new IntermediateResultPartitionID(), channel);
@@ -370,7 +370,7 @@ public class LocalInputChannelTest {
new ResultPartitionID(),
partitionManager,
new TaskEventDispatcher(),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
channel.requestSubpartition(0);
@@ -411,7 +411,7 @@ public class LocalInputChannelTest {
mock(TaskEventDispatcher.class),
initialAndMaxRequestBackoff._1(),
initialAndMaxRequestBackoff._2(),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
/**
@@ -487,7 +487,7 @@ public class LocalInputChannelTest {
subpartitionIndex,
numberOfInputChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
// Set buffer pool
inputGate.setBufferPool(bufferPool);
@@ -502,7 +502,7 @@ public class LocalInputChannelTest {
consumedPartitionIds[i],
partitionManager,
taskEventDispatcher,
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup()));
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup()));
}
this.numberOfInputChannels = numberOfInputChannels;
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index bced9ce..d791ced 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
import org.apache.flink.runtime.io.network.partition.ProducerFailedException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
@@ -270,7 +270,7 @@ public class RemoteInputChannelTest {
partitionId,
mock(ConnectionID.class),
connectionManager,
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
ch.onFailedPartitionRequest();
@@ -290,7 +290,7 @@ public class RemoteInputChannelTest {
new ResultPartitionID(),
mock(ConnectionID.class),
connManager,
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
ch.onError(new ProducerFailedException(new RuntimeException("Expected test exception.")));
@@ -401,6 +401,6 @@ public class RemoteInputChannelTest {
connectionManager,
initialAndMaxRequestBackoff._1(),
initialAndMaxRequestBackoff._2(),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 4d7d884..da649cd 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -43,7 +43,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
import org.apache.flink.runtime.io.network.util.TestTaskEvent;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
@@ -80,7 +80,7 @@ public class SingleInputGateTest {
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 2,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
assertEquals(ResultPartitionType.PIPELINED, inputGate.getConsumedPartitionType());
@@ -140,7 +140,7 @@ public class SingleInputGateTest {
resultId, ResultPartitionType.PIPELINED,
0, 2,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final BufferPool bufferPool = mock(BufferPool.class);
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
@@ -149,12 +149,12 @@ public class SingleInputGateTest {
// Local
ResultPartitionID localPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
- InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ InputChannel local = new LocalInputChannel(inputGate, 0, localPartitionId, partitionManager, taskEventDispatcher, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
// Unknown
ResultPartitionID unknownPartitionId = new ResultPartitionID(new IntermediateResultPartitionID(), new ExecutionAttemptID());
- InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class), 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
// Set channels
inputGate.setInputChannel(localPartitionId.getPartitionId(), local);
@@ -195,7 +195,7 @@ public class SingleInputGateTest {
ResultPartitionType.PIPELINED,
0,
1,
- mock(TaskActions.class), new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ mock(TaskActions.class), UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
ResultPartitionManager partitionManager = mock(ResultPartitionManager.class);
@@ -206,7 +206,7 @@ public class SingleInputGateTest {
partitionManager,
new TaskEventDispatcher(),
new LocalConnectionManager(),
- 0, 0, new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ 0, 0, UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
@@ -236,7 +236,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
InputChannel unknown = new UnknownInputChannel(
inputGate,
@@ -246,7 +246,7 @@ public class SingleInputGateTest {
new TaskEventDispatcher(),
new LocalConnectionManager(),
0, 0,
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
inputGate.setInputChannel(unknown.partitionId.getPartitionId(), unknown);
@@ -339,7 +339,7 @@ public class SingleInputGateTest {
gateDesc,
netEnv,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
assertEquals(gateDesc.getConsumedPartitionType(), gate.getConsumedPartitionType());
@@ -388,7 +388,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
RemoteInputChannel remote = mock(RemoteInputChannel.class);
inputGate.setInputChannel(new IntermediateResultPartitionID(), remote);
@@ -416,7 +416,7 @@ public class SingleInputGateTest {
0,
1,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
UnknownInputChannel unknown = mock(UnknownInputChannel.class);
final ResultPartitionID resultPartitionId = new ResultPartitionID();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 18ad490..0ae6e74 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -22,7 +22,7 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -60,7 +60,7 @@ public class TestSingleInputGate {
0,
numberOfInputChannels,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
this.inputGate = spy(realGate);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index bc1dd07..9884855 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.io.network.partition.consumer;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.taskmanager.TaskActions;
import org.junit.Test;
@@ -48,13 +48,13 @@ public class UnionInputGateTest {
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 3,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final SingleInputGate ig2 = new SingleInputGate(
testTaskName, new JobID(),
new IntermediateDataSetID(), ResultPartitionType.PIPELINED,
0, 5,
mock(TaskActions.class),
- new UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup().getIOMetricGroup());
final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
index f5d6802..0b7547d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHAJobGraphRecoveryITCase.java
@@ -203,7 +203,7 @@ public class JobManagerHAJobGraphRecoveryITCase extends TestLogger {
ResourceID.generate(),
taskManagerSystem,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 88141d6..f86e7e1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -64,6 +64,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -206,7 +207,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
mySubmittedJobGraphStore,
checkpointStateFactory,
jobRecoveryTimeout,
- new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
+ UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
Option.<String>empty());
jobManager = system.actorOf(jobManagerProps);
@@ -217,7 +218,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
ResourceID.generate(),
system,
testingHighAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("taskmanager"),
true,
@@ -381,7 +382,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
submittedJobGraphStore,
mock(CheckpointRecoveryFactory.class),
jobRecoveryTimeout,
- new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
+ UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
jobManager = system.actorOf(jobManagerProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index 6a02d1f..51cc469 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -626,7 +626,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
TestingJobManager.class,
MemoryArchivist.class)._1();
@@ -648,7 +648,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
system,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
scala.Option.<String>empty(),
true,
@@ -845,7 +845,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -864,7 +864,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
@@ -1057,7 +1057,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -1076,7 +1076,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
@@ -1172,7 +1172,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -1191,7 +1191,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
@@ -1285,7 +1285,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -1307,7 +1307,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.apply("tm"),
true,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index cc93f18..0ca83ae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -95,7 +95,7 @@ public class JobSubmitTest {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index 72c03af..703cd0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -201,7 +202,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
submittedJobGraphStore,
checkpointRecoveryFactory,
AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
- new JobManagerMetricGroup(new NoOpMetricRegistry(), "localhost"),
+ UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
Option.<String>empty());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
deleted file mode 100644
index 46d6548..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-
-import javax.annotation.Nullable;
-
-/**
- * Metric registry which does nothing and is intended for testing purposes.
- */
-public class NoOpMetricRegistry implements MetricRegistry {
-
- final char delimiter = ',';
-
- final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
-
- @Override
- public char getDelimiter() {
- return delimiter;
- }
-
- @Override
- public char getDelimiter(int index) {
- return delimiter;
- }
-
- @Override
- public int getNumberReporters() {
- return 0;
- }
-
- @Override
- public void register(Metric metric, String metricName, AbstractMetricGroup group) {}
-
- @Override
- public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {}
-
- @Override
- public ScopeFormats getScopeFormats() {
- return scopeFormats;
- }
-
- @Nullable
- @Override
- public String getMetricQueryServicePath() {
- return null;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index d934ea9..eec7165 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -78,7 +78,7 @@ public class TaskManagerMetricsTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 0fced33..4dc5edf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -86,7 +86,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testUserDefinedVariable() {
- MetricRegistry registry = new NoOpMetricRegistry();
+ MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
@@ -111,7 +111,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testUserDefinedVariableOnKeyGroup() {
- MetricRegistry registry = new NoOpMetricRegistry();
+ MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key1 = "key1";
@@ -142,7 +142,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testNameCollisionForKeyAfterGenericGroup() {
- MetricRegistry registry = new NoOpMetricRegistry();
+ MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
@@ -169,7 +169,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testNameCollisionForKeyAndValueAfterGenericGroup() {
- MetricRegistry registry = new NoOpMetricRegistry();
+ MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
@@ -195,7 +195,7 @@ public class MetricGroupTest extends TestLogger {
*/
@Test
public void testNameCollisionAfterKeyValueGroup() {
- MetricRegistry registry = new NoOpMetricRegistry();
+ MetricRegistry registry = NoOpMetricRegistry.INSTANCE;
GenericMetricGroup root = new GenericMetricGroup(registry, new DummyAbstractMetricGroup(registry), "root");
String key = "key";
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
index bcf77de..f23b2f5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskIOMetricGroupTest.java
@@ -21,7 +21,6 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.executiongraph.IOMetrics;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.junit.Test;
@@ -34,7 +33,7 @@ import static org.junit.Assert.assertNotNull;
public class TaskIOMetricGroupTest {
@Test
public void testTaskIOMetricGroup() {
- TaskMetricGroup task = new UnregisteredTaskMetricsGroup();
+ TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
TaskIOMetricGroup taskIO = task.getIOMetricGroup();
// test counter forwarding
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
index 4fa74b3..a4d14c4 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java
@@ -29,10 +29,10 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.testutils.DummyInvokable;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
@@ -227,6 +227,6 @@ public class TestTaskContext<S, T> implements TaskContext<S, T> {
@Override
public OperatorMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+ return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
index 96c8b73..a76f110 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/BinaryOperatorTestBase.java
@@ -26,13 +26,12 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.TaskContext;
@@ -373,7 +372,7 @@ public abstract class BinaryOperatorTestBase<S extends Function, IN, OUT> extend
@Override
public OperatorMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+ return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
index d2cedb9..3820bf9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DriverTestBase.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.ResettableDriver;
import org.apache.flink.runtime.operators.TaskContext;
@@ -368,7 +369,7 @@ public abstract class DriverTestBase<S extends Function> extends TestLogger impl
@Override
public OperatorMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+ return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
index 718ecfe..148eb0b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/DummyEnvironment.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -100,7 +101,7 @@ public class DummyEnvironment implements Environment {
@Override
public TaskMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup();
+ return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index f655b12..861cf35 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -45,6 +45,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
@@ -281,7 +282,7 @@ public class MockEnvironment implements Environment {
@Override
public TaskMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup();
+ return UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
index 141aec6..2ef82da 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnaryOperatorTestBase.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.ResettableDriver;
@@ -364,7 +365,7 @@ public abstract class UnaryOperatorTestBase<S extends Function, IN, OUT> extends
@Override
public OperatorMetricGroup getMetricGroup() {
- return new UnregisteredTaskMetricsGroup.DummyOperatorMetricGroup();
+ return UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup();
}
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
deleted file mode 100644
index 7065e6b..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/UnregisteredTaskMetricsGroup.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.operators.testutils;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
-import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
-import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
-
-import java.util.UUID;
-
-public class UnregisteredTaskMetricsGroup extends TaskMetricGroup {
-
- private static final MetricRegistry EMPTY_REGISTRY = new NoOpMetricRegistry();
-
-
- public UnregisteredTaskMetricsGroup() {
- super(EMPTY_REGISTRY, new DummyJobMetricGroup(),
- new JobVertexID(), new ExecutionAttemptID(), "testtask", 0, 0);
- }
-
- @Override
- protected void addMetric(String name, Metric metric) {}
-
- @Override
- public MetricGroup addGroup(String name) {
- return new UnregisteredMetricsGroup();
- }
-
- // ------------------------------------------------------------------------
-
- private static class DummyTaskManagerMetricsGroup extends TaskManagerMetricGroup {
-
- public DummyTaskManagerMetricsGroup() {
- super(EMPTY_REGISTRY, "localhost", UUID.randomUUID().toString());
- }
- }
-
- private static class DummyJobMetricGroup extends TaskManagerJobMetricGroup {
-
- public DummyJobMetricGroup() {
- super(EMPTY_REGISTRY, new DummyTaskManagerMetricsGroup(), new JobID(), "testjob");
- }
- }
-
- public static class DummyTaskIOMetricGroup extends TaskIOMetricGroup {
- public DummyTaskIOMetricGroup() {
- super(new UnregisteredTaskMetricsGroup());
- }
- }
-
- public static class DummyOperatorMetricGroup extends OperatorMetricGroup {
- public DummyOperatorMetricGroup() {
- super(EMPTY_REGISTRY, new UnregisteredTaskMetricsGroup(), new OperatorID(), "testoperator");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
index 3050718..87cb4a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTest.java
@@ -97,7 +97,7 @@ public class ResourceManagerTest extends TestLogger {
highAvailabilityServices,
new HeartbeatServices(1000L, 10000L),
slotManager,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
jobLeaderIdService,
testingFatalErrorHandler);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
index 98b5b8b..a3c41c5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.java
@@ -98,7 +98,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
@@ -168,7 +168,7 @@ public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
network,
numberOfSlots,
highAvailabilityServices,
- new TaskManagerMetricGroup(new NoOpMetricRegistry(), connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
+ new TaskManagerMetricGroup(NoOpMetricRegistry.INSTANCE, connectionInfo.getHostname(), connectionInfo.getResourceID().getResourceIdString()));
taskManager = actorSystem.actorOf(tmProps);
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
index 7429ec5..fadebce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerProcessReapingTestBase.java
@@ -122,7 +122,7 @@ public abstract class TaskManagerProcessReapingTestBase extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1;
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
index ed06dc0..4c7a8cf 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerStartupTest.java
@@ -250,7 +250,7 @@ public class TaskManagerStartupTest extends TestLogger {
ResourceID.generate(),
null,
highAvailabilityServices,
- new NoOpMetricRegistry(),
+ NoOpMetricRegistry.INSTANCE,
"localhost",
Option.<String>empty(),
false,
http://git-wip-us.apache.org/repos/asf/flink/blob/0bf0fdc2/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
index 38238cd..086ad71 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/JvmExitOnFatalErrorTest.java
@@ -52,7 +52,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
@@ -193,7 +193,7 @@ public class JvmExitOnFatalErrorTest {
new String[0]),
new FileCache(tmInfo.getTmpDirectories()),
tmInfo,
- new UnregisteredTaskMetricsGroup(),
+ UnregisteredMetricGroups.createUnregisteredTaskMetricGroup(),
new NoOpResultPartitionConsumableNotifier(),
new NoOpPartitionProducerStateChecker(),
executor);