You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/11/01 22:27:42 UTC
[2/7] flink git commit: [FLINK-7876] Register TaskManagerMetricGroup
under ResourceID
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
index 1a3ca70..d8e65a6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java
@@ -45,7 +45,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration;
@@ -502,7 +502,7 @@ public class ResourceManagerTest extends TestLogger {
final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
- final MetricRegistry metricRegistry = mock(MetricRegistry.class);
+ final MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class);
final JobLeaderIdService jobLeaderIdService = mock(JobLeaderIdService.class);
final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
final SlotManager slotManager = new SlotManager(
@@ -601,7 +601,7 @@ public class ResourceManagerTest extends TestLogger {
final ScheduledExecutor scheduledExecutor = mock(ScheduledExecutor.class);
final HeartbeatServices heartbeatServices = new TestingHeartbeatServices(heartbeatInterval, heartbeatTimeout, scheduledExecutor);
- final MetricRegistry metricRegistry = mock(MetricRegistry.class);
+ final MetricRegistryImpl metricRegistry = mock(MetricRegistryImpl.class);
final JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(
highAvailabilityServices,
rpcService.getScheduledExecutor(),
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
index d1ca757..8558145 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/DispatcherTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -114,7 +114,7 @@ public class DispatcherTest extends TestLogger {
mock(ResourceManagerGateway.class),
mock(BlobServer.class),
heartbeatServices,
- mock(MetricRegistry.class),
+ mock(MetricRegistryImpl.class),
fatalErrorHandler,
jobManagerRunner,
jobId);
@@ -174,7 +174,7 @@ public class DispatcherTest extends TestLogger {
mock(ResourceManagerGateway.class),
mock(BlobServer.class),
heartbeatServices,
- mock(MetricRegistry.class),
+ mock(MetricRegistryImpl.class),
fatalErrorHandler,
mock(JobManagerRunner.class),
jobId);
@@ -209,7 +209,7 @@ public class DispatcherTest extends TestLogger {
ResourceManagerGateway resourceManagerGateway,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
FatalErrorHandler fatalErrorHandler,
JobManagerRunner jobManagerRunner,
JobID expectedJobId) throws Exception {
@@ -238,7 +238,7 @@ public class DispatcherTest extends TestLogger {
HighAvailabilityServices highAvailabilityServices,
HeartbeatServices heartbeatServices,
JobManagerServices jobManagerServices,
- MetricRegistry metricRegistry,
+ MetricRegistryImpl metricRegistry,
OnCompletionActions onCompleteActions,
FatalErrorHandler fatalErrorHandler) throws Exception {
assertEquals(expectedJobId, jobGraph.getJobID());
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
index 7df26fc..d843da2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java
@@ -62,7 +62,9 @@ import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskmanager.TaskManager;
@@ -206,7 +208,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
mySubmittedJobGraphStore,
checkpointStateFactory,
jobRecoveryTimeout,
- Option.<MetricRegistry>empty(),
+ Option.<MetricRegistryImpl>empty(),
Option.<String>empty());
jobManager = system.actorOf(jobManagerProps);
@@ -217,6 +219,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
ResourceID.generate(),
system,
testingHighAvailabilityServices,
+ new NoOpMetricRegistry(),
"localhost",
Option.apply("taskmanager"),
true,
@@ -380,7 +383,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
submittedJobGraphStore,
mock(CheckpointRecoveryFactory.class),
jobRecoveryTimeout,
- Option.<MetricRegistry>apply(null),
+ Option.<MetricRegistryImpl>apply(null),
recoveredJobs).withDispatcher(CallingThreadDispatcher.Id());
jobManager = system.actorOf(jobManagerProps);
@@ -418,7 +421,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
SubmittedJobGraphStore submittedJobGraphs,
CheckpointRecoveryFactory checkpointRecoveryFactory,
FiniteDuration jobRecoveryTimeout,
- Option<MetricRegistry> metricsRegistry,
+ JobManagerMetricGroup jobManagerMetricGroup,
Collection<JobID> recoveredJobs) {
super(
flinkConfiguration,
@@ -435,7 +438,7 @@ public class JobManagerHARecoveryTest extends TestLogger {
submittedJobGraphs,
checkpointRecoveryFactory,
jobRecoveryTimeout,
- metricsRegistry,
+ jobManagerMetricGroup,
Option.empty());
this.recoveredJobs = recoveredJobs;
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index bd7f11f..a697aae 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -75,6 +75,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint;
import org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepointSuccess;
import org.apache.flink.runtime.messages.RegistrationMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.query.KvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage.LookupKvStateLocation;
import org.apache.flink.runtime.query.KvStateMessage.NotifyKvStateRegistered;
@@ -624,6 +625,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
+ new NoOpMetricRegistry(),
Option.empty(),
TestingJobManager.class,
MemoryArchivist.class)._1();
@@ -645,6 +647,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
system,
highAvailabilityServices,
+ new NoOpMetricRegistry(),
"localhost",
scala.Option.<String>empty(),
true,
@@ -841,6 +844,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
+ new NoOpMetricRegistry(),
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -859,6 +863,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
+ new NoOpMetricRegistry(),
"localhost",
Option.apply("tm"),
true,
@@ -1051,6 +1056,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
+ new NoOpMetricRegistry(),
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -1069,6 +1075,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
+ new NoOpMetricRegistry(),
"localhost",
Option.apply("tm"),
true,
@@ -1164,6 +1171,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
+ new NoOpMetricRegistry(),
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -1182,6 +1190,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
+ new NoOpMetricRegistry(),
"localhost",
Option.apply("tm"),
true,
@@ -1275,6 +1284,7 @@ public class JobManagerTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
+ new NoOpMetricRegistry(),
Option.empty(),
Option.apply("jm"),
Option.apply("arch"),
@@ -1296,6 +1306,7 @@ public class JobManagerTest extends TestLogger {
ResourceID.generate(),
actorSystem,
highAvailabilityServices,
+ new NoOpMetricRegistry(),
"localhost",
Option.apply("tm"),
true,
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
index 340a735..cc93f18 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobSubmitTest.java
@@ -37,6 +37,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
@@ -94,6 +95,7 @@ public class JobSubmitTest {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
+ new NoOpMetricRegistry(),
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
index b4f50fb..083d6e9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java
@@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
@@ -115,7 +115,7 @@ public class JobManagerRunnerMockTest extends TestLogger {
haServices,
heartbeatServices,
JobManagerServices.fromConfiguration(new Configuration(), mock(BlobServer.class)),
- new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
+ new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration()),
jobCompletion,
jobCompletion));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
index c3b57fa..e4ceb40 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.java
@@ -28,7 +28,6 @@ import akka.util.Timeout;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.BlobServerOptions;
-import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
@@ -43,7 +42,7 @@ import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
-import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -199,7 +198,7 @@ public class JobManagerLeaderElectionTest extends TestLogger {
submittedJobGraphStore,
checkpointRecoveryFactory,
AkkaUtils.getDefaultTimeoutAsFiniteDuration(),
- Option.<MetricRegistry>empty(),
+ Option.<MetricRegistryImpl>empty(),
Option.<String>empty());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
new file mode 100644
index 0000000..b0b20b2
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryImplTest.java
@@ -0,0 +1,496 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.MetricOptions;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.reporter.Scheduled;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
+import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorNotFound;
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Tests for the {@link MetricRegistryImpl}.
+ */
+public class MetricRegistryImplTest extends TestLogger {
+
+ private static final char GLOBAL_DEFAULT_DELIMITER = '.';
+
+ @Test
+ public void testIsShutdown() {
+ MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+ Assert.assertFalse(metricRegistry.isShutdown());
+
+ metricRegistry.shutdown();
+
+ Assert.assertTrue(metricRegistry.isShutdown());
+ }
+
+ /**
+ * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
+ */
+ @Test
+ public void testReporterInstantiation() {
+ Configuration config = new Configuration();
+
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
+
+ MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+ assertTrue(metricRegistry.getReporters().size() == 1);
+
+ Assert.assertTrue(TestReporter1.wasOpened);
+
+ metricRegistry.shutdown();
+ }
+
+ /**
+ * Reporter that exposes whether open() was called.
+ */
+ protected static class TestReporter1 extends TestReporter {
+ public static boolean wasOpened = false;
+
+ @Override
+ public void open(MetricConfig config) {
+ wasOpened = true;
+ }
+ }
+
+ /**
+ * Verifies that multiple reporters are instantiated correctly.
+ */
+ @Test
+ public void testMultipleReporterInstantiation() {
+ Configuration config = new Configuration();
+
+ config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
+
+ MetricRegistryImpl metricRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+ assertTrue(metricRegistry.getReporters().size() == 3);
+
+ Assert.assertTrue(TestReporter11.wasOpened);
+ Assert.assertTrue(TestReporter12.wasOpened);
+ Assert.assertTrue(TestReporter13.wasOpened);
+
+ metricRegistry.shutdown();
+ }
+
+ /**
+ * Reporter that exposes whether open() was called.
+ */
+ protected static class TestReporter11 extends TestReporter {
+ public static boolean wasOpened = false;
+
+ @Override
+ public void open(MetricConfig config) {
+ wasOpened = true;
+ }
+ }
+
+ /**
+ * Reporter that exposes whether open() was called.
+ */
+ protected static class TestReporter12 extends TestReporter {
+ public static boolean wasOpened = false;
+
+ @Override
+ public void open(MetricConfig config) {
+ wasOpened = true;
+ }
+ }
+
+ /**
+ * Reporter that exposes whether open() was called.
+ */
+ protected static class TestReporter13 extends TestReporter {
+ public static boolean wasOpened = false;
+
+ @Override
+ public void open(MetricConfig config) {
+ wasOpened = true;
+ }
+ }
+
+ /**
+ * Verifies that configured arguments are properly forwarded to the reporter.
+ */
+ @Test
+ public void testReporterArgumentForwarding() {
+ Configuration config = new Configuration();
+
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
+
+ new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
+
+ Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null));
+ Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null));
+ }
+
+ /**
+ * Reporter that exposes the {@link MetricConfig} it was given.
+ */
+ protected static class TestReporter2 extends TestReporter {
+ static MetricConfig mc;
+ @Override
+ public void open(MetricConfig config) {
+ mc = config;
+ }
+ }
+
+ /**
+ * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics.
+ *
+ * @throws InterruptedException
+ */
+ @Test
+ public void testReporterScheduling() throws InterruptedException {
+ Configuration config = new Configuration();
+
+ config.setString(MetricOptions.REPORTERS_LIST, "test");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+ long start = System.currentTimeMillis();
+
+ // only start counting from now on
+ TestReporter3.reportCount = 0;
+
+ for (int x = 0; x < 10; x++) {
+ Thread.sleep(100);
+ int reportCount = TestReporter3.reportCount;
+ long curT = System.currentTimeMillis();
+ /**
+ * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports.
+ * This value however does not not take the first triggered report into account (=> +1).
+ * Furthermore we have to account for the mis-alignment between reports being triggered and our time
+ * measurement (=> +1); for T=200 a total of 4-6 reports may have been
+ * triggered depending on whether the end of the interval for the first reports ends before
+ * or after T=50.
+ */
+ long maxAllowedReports = (curT - start) / 50 + 2;
+ Assert.assertTrue("Too many reports were triggered.", maxAllowedReports >= reportCount);
+ }
+ Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0);
+
+ registry.shutdown();
+ }
+
+ /**
+ * Reporter that exposes how often report() was called.
+ */
+ protected static class TestReporter3 extends TestReporter implements Scheduled {
+ public static int reportCount = 0;
+
+ @Override
+ public void report() {
+ reportCount++;
+ }
+ }
+
+ /**
+ * Verifies that reporters are notified of added/removed metrics.
+ */
+ @Test
+ public void testReporterNotifications() {
+ Configuration config = new Configuration();
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+ TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
+ root.counter("rootCounter");
+
+ assertTrue(TestReporter6.addedMetric instanceof Counter);
+ assertEquals("rootCounter", TestReporter6.addedMetricName);
+
+ assertTrue(TestReporter7.addedMetric instanceof Counter);
+ assertEquals("rootCounter", TestReporter7.addedMetricName);
+
+ root.close();
+
+ assertTrue(TestReporter6.removedMetric instanceof Counter);
+ assertEquals("rootCounter", TestReporter6.removedMetricName);
+
+ assertTrue(TestReporter7.removedMetric instanceof Counter);
+ assertEquals("rootCounter", TestReporter7.removedMetricName);
+
+ registry.shutdown();
+ }
+
+ /**
+ * Reporter that exposes the name and metric instance of the last metric that was added or removed.
+ */
+ protected static class TestReporter6 extends TestReporter {
+ static Metric addedMetric;
+ static String addedMetricName;
+
+ static Metric removedMetric;
+ static String removedMetricName;
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+ addedMetric = metric;
+ addedMetricName = metricName;
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+ removedMetric = metric;
+ removedMetricName = metricName;
+ }
+ }
+
+ /**
+ * Reporter that exposes the name and metric instance of the last metric that was added or removed.
+ */
+ protected static class TestReporter7 extends TestReporter {
+ static Metric addedMetric;
+ static String addedMetricName;
+
+ static Metric removedMetric;
+ static String removedMetricName;
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+ addedMetric = metric;
+ addedMetricName = metricName;
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+ removedMetric = metric;
+ removedMetricName = metricName;
+ }
+ }
+
+ /**
+ * Verifies that the scope configuration is properly extracted.
+ */
+ @Test
+ public void testScopeConfig() {
+ Configuration config = new Configuration();
+
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
+ config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
+ config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
+ config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
+
+ ScopeFormats scopeConfig = ScopeFormats.fromConfig(config);
+
+ assertEquals("A", scopeConfig.getTaskManagerFormat().format());
+ assertEquals("B", scopeConfig.getTaskManagerJobFormat().format());
+ assertEquals("C", scopeConfig.getTaskFormat().format());
+ assertEquals("D", scopeConfig.getOperatorFormat().format());
+ }
+
+ @Test
+ public void testConfigurableDelimiter() {
+ Configuration config = new Configuration();
+ config.setString(MetricOptions.SCOPE_DELIMITER, "_");
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+ TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id");
+ assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name"));
+
+ registry.shutdown();
+ }
+
+ @Test
+ public void testConfigurableDelimiterForReporters() {
+ Configuration config = new Configuration();
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+ assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter());
+ assertEquals('_', registry.getDelimiter(0));
+ assertEquals('-', registry.getDelimiter(1));
+ assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(2));
+ assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3));
+ assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1));
+
+ registry.shutdown();
+ }
+
+ @Test
+ public void testConfigurableDelimiterForReportersInGroup() {
+ Configuration config = new Configuration();
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
+ config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+ List<MetricReporter> reporters = registry.getReporters();
+ ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1 reporter
+ ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
+ ((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
+ ((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
+
+ TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
+ group.counter("C");
+ group.close();
+ registry.shutdown();
+ assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
+ assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
+ }
+
+ /**
+ * Tests that the query actor will be stopped when the MetricRegistry is shut down.
+ */
+ @Test
+ public void testQueryActorShutdown() throws Exception {
+ final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+
+ final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
+
+ registry.startQueryService(actorSystem, null);
+
+ ActorRef queryServiceActor = registry.getQueryService();
+
+ registry.shutdown();
+
+ try {
+ Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);
+
+ fail("The query actor should be terminated resulting in a ActorNotFound exception.");
+ } catch (ActorNotFound e) {
+ // we expect the query actor to be shut down
+ }
+ }
+
+ /**
+ * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier.
+ */
+ public static class TestReporter8 extends TestReporter {
+ char expectedDelimiter;
+ public static int numCorrectDelimitersForRegister = 0;
+ public static int numCorrectDelimitersForUnregister = 0;
+
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+ String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
+ assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
+ assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
+ numCorrectDelimitersForRegister++;
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+ String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
+ assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
+ assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
+ numCorrectDelimitersForUnregister++;
+ }
+ }
+
+ @Test
+ public void testExceptionIsolation() throws Exception {
+
+ Configuration config = new Configuration();
+ config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName());
+ config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
+
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
+
+ Counter metric = new SimpleCounter();
+ registry.register(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+ assertEquals(metric, TestReporter7.addedMetric);
+ assertEquals("counter", TestReporter7.addedMetricName);
+
+ registry.unregister(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
+
+ assertEquals(metric, TestReporter7.removedMetric);
+ assertEquals("counter", TestReporter7.removedMetricName);
+
+ registry.shutdown();
+ }
+
+ /**
+ * Reporter that throws an exception when it is notified of an added or removed metric.
+ */
+ protected static class FailingReporter extends TestReporter {
+ @Override
+ public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
+ throw new RuntimeException();
+ }
+
+ @Override
+ public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
+ throw new RuntimeException();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
deleted file mode 100644
index 284b86a..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java
+++ /dev/null
@@ -1,496 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.metrics;
-
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.metrics.Counter;
-import org.apache.flink.metrics.Metric;
-import org.apache.flink.metrics.MetricConfig;
-import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.metrics.SimpleCounter;
-import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.metrics.reporter.Scheduled;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.groups.MetricGroupTest;
-import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
-import org.apache.flink.runtime.metrics.scope.ScopeFormats;
-import org.apache.flink.runtime.metrics.util.TestReporter;
-import org.apache.flink.util.TestLogger;
-
-import akka.actor.ActorNotFound;
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.Await;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-/**
- * Tests for the {@link MetricRegistry}.
- */
-public class MetricRegistryTest extends TestLogger {
-
- private static final char GLOBAL_DEFAULT_DELIMITER = '.';
-
- @Test
- public void testIsShutdown() {
- MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
- Assert.assertFalse(metricRegistry.isShutdown());
-
- metricRegistry.shutdown();
-
- Assert.assertTrue(metricRegistry.isShutdown());
- }
-
- /**
- * Verifies that the reporter class argument is correctly used to instantiate and open the reporter.
- */
- @Test
- public void testReporterInstantiation() {
- Configuration config = new Configuration();
-
- config.setString(MetricOptions.REPORTERS_LIST, "test");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
-
- MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- assertTrue(metricRegistry.getReporters().size() == 1);
-
- Assert.assertTrue(TestReporter1.wasOpened);
-
- metricRegistry.shutdown();
- }
-
- /**
- * Reporter that exposes whether open() was called.
- */
- protected static class TestReporter1 extends TestReporter {
- public static boolean wasOpened = false;
-
- @Override
- public void open(MetricConfig config) {
- wasOpened = true;
- }
- }
-
- /**
- * Verifies that multiple reporters are instantiated correctly.
- */
- @Test
- public void testMultipleReporterInstantiation() {
- Configuration config = new Configuration();
-
- config.setString(MetricOptions.REPORTERS_LIST, "test1, test2,test3");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter11.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter12.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter13.class.getName());
-
- MetricRegistry metricRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- assertTrue(metricRegistry.getReporters().size() == 3);
-
- Assert.assertTrue(TestReporter11.wasOpened);
- Assert.assertTrue(TestReporter12.wasOpened);
- Assert.assertTrue(TestReporter13.wasOpened);
-
- metricRegistry.shutdown();
- }
-
- /**
- * Reporter that exposes whether open() was called.
- */
- protected static class TestReporter11 extends TestReporter {
- public static boolean wasOpened = false;
-
- @Override
- public void open(MetricConfig config) {
- wasOpened = true;
- }
- }
-
- /**
- * Reporter that exposes whether open() was called.
- */
- protected static class TestReporter12 extends TestReporter {
- public static boolean wasOpened = false;
-
- @Override
- public void open(MetricConfig config) {
- wasOpened = true;
- }
- }
-
- /**
- * Reporter that exposes whether open() was called.
- */
- protected static class TestReporter13 extends TestReporter {
- public static boolean wasOpened = false;
-
- @Override
- public void open(MetricConfig config) {
- wasOpened = true;
- }
- }
-
- /**
- * Verifies that configured arguments are properly forwarded to the reporter.
- */
- @Test
- public void testReporterArgumentForwarding() {
- Configuration config = new Configuration();
-
- config.setString(MetricOptions.REPORTERS_LIST, "test");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg2", "world");
-
- new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)).shutdown();
-
- Assert.assertEquals("hello", TestReporter2.mc.getString("arg1", null));
- Assert.assertEquals("world", TestReporter2.mc.getString("arg2", null));
- }
-
- /**
- * Reporter that exposes the {@link MetricConfig} it was given.
- */
- protected static class TestReporter2 extends TestReporter {
- static MetricConfig mc;
- @Override
- public void open(MetricConfig config) {
- mc = config;
- }
- }
-
- /**
- * Verifies that reporters implementing the Scheduled interface are regularly called to report the metrics.
- *
- * @throws InterruptedException
- */
- @Test
- public void testReporterScheduling() throws InterruptedException {
- Configuration config = new Configuration();
-
- config.setString(MetricOptions.REPORTERS_LIST, "test");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter3.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.arg1", "hello");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "50 MILLISECONDS");
-
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- long start = System.currentTimeMillis();
-
- // only start counting from now on
- TestReporter3.reportCount = 0;
-
- for (int x = 0; x < 10; x++) {
- Thread.sleep(100);
- int reportCount = TestReporter3.reportCount;
- long curT = System.currentTimeMillis();
- /**
- * Within a given time-frame T only T/500 reports may be triggered due to the interval between reports.
- * This value however does not not take the first triggered report into account (=> +1).
- * Furthermore we have to account for the mis-alignment between reports being triggered and our time
- * measurement (=> +1); for T=200 a total of 4-6 reports may have been
- * triggered depending on whether the end of the interval for the first reports ends before
- * or after T=50.
- */
- long maxAllowedReports = (curT - start) / 50 + 2;
- Assert.assertTrue("Too many reports were triggered.", maxAllowedReports >= reportCount);
- }
- Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0);
-
- registry.shutdown();
- }
-
- /**
- * Reporter that exposes how often report() was called.
- */
- protected static class TestReporter3 extends TestReporter implements Scheduled {
- public static int reportCount = 0;
-
- @Override
- public void report() {
- reportCount++;
- }
- }
-
- /**
- * Verifies that reporters are notified of added/removed metrics.
- */
- @Test
- public void testReporterNotifications() {
- Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter6.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
-
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- TaskManagerMetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
- root.counter("rootCounter");
-
- assertTrue(TestReporter6.addedMetric instanceof Counter);
- assertEquals("rootCounter", TestReporter6.addedMetricName);
-
- assertTrue(TestReporter7.addedMetric instanceof Counter);
- assertEquals("rootCounter", TestReporter7.addedMetricName);
-
- root.close();
-
- assertTrue(TestReporter6.removedMetric instanceof Counter);
- assertEquals("rootCounter", TestReporter6.removedMetricName);
-
- assertTrue(TestReporter7.removedMetric instanceof Counter);
- assertEquals("rootCounter", TestReporter7.removedMetricName);
-
- registry.shutdown();
- }
-
- /**
- * Reporter that exposes the name and metric instance of the last metric that was added or removed.
- */
- protected static class TestReporter6 extends TestReporter {
- static Metric addedMetric;
- static String addedMetricName;
-
- static Metric removedMetric;
- static String removedMetricName;
-
- @Override
- public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
- addedMetric = metric;
- addedMetricName = metricName;
- }
-
- @Override
- public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
- removedMetric = metric;
- removedMetricName = metricName;
- }
- }
-
- /**
- * Reporter that exposes the name and metric instance of the last metric that was added or removed.
- */
- protected static class TestReporter7 extends TestReporter {
- static Metric addedMetric;
- static String addedMetricName;
-
- static Metric removedMetric;
- static String removedMetricName;
-
- @Override
- public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
- addedMetric = metric;
- addedMetricName = metricName;
- }
-
- @Override
- public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
- removedMetric = metric;
- removedMetricName = metricName;
- }
- }
-
- /**
- * Verifies that the scope configuration is properly extracted.
- */
- @Test
- public void testScopeConfig() {
- Configuration config = new Configuration();
-
- config.setString(MetricOptions.SCOPE_NAMING_TM, "A");
- config.setString(MetricOptions.SCOPE_NAMING_TM_JOB, "B");
- config.setString(MetricOptions.SCOPE_NAMING_TASK, "C");
- config.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "D");
-
- ScopeFormats scopeConfig = ScopeFormats.fromConfig(config);
-
- assertEquals("A", scopeConfig.getTaskManagerFormat().format());
- assertEquals("B", scopeConfig.getTaskManagerJobFormat().format());
- assertEquals("C", scopeConfig.getTaskFormat().format());
- assertEquals("D", scopeConfig.getOperatorFormat().format());
- }
-
- @Test
- public void testConfigurableDelimiter() {
- Configuration config = new Configuration();
- config.setString(MetricOptions.SCOPE_DELIMITER, "_");
- config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D.E");
-
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "host", "id");
- assertEquals("A_B_C_D_E_name", tmGroup.getMetricIdentifier("name"));
-
- registry.shutdown();
- }
-
- @Test
- public void testConfigurableDelimiterForReporters() {
- Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter.class.getName());
-
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter());
- assertEquals('_', registry.getDelimiter(0));
- assertEquals('-', registry.getDelimiter(1));
- assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(2));
- assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(3));
- assertEquals(GLOBAL_DEFAULT_DELIMITER, registry.getDelimiter(-1));
-
- registry.shutdown();
- }
-
- @Test
- public void testConfigurableDelimiterForReportersInGroup() {
- Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2,test3,test4");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "_");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "-");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "AA");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test3." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test4." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter8.class.getName());
- config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B");
-
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
- List<MetricReporter> reporters = registry.getReporters();
- ((TestReporter8) reporters.get(0)).expectedDelimiter = '_'; //test1 reporter
- ((TestReporter8) reporters.get(1)).expectedDelimiter = '-'; //test2 reporter
- ((TestReporter8) reporters.get(2)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //test3 reporter, because 'AA' - not correct delimiter
- ((TestReporter8) reporters.get(3)).expectedDelimiter = GLOBAL_DEFAULT_DELIMITER; //for test4 reporter use global delimiter
-
- TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
- group.counter("C");
- group.close();
- registry.shutdown();
- assertEquals(4, TestReporter8.numCorrectDelimitersForRegister);
- assertEquals(4, TestReporter8.numCorrectDelimitersForUnregister);
- }
-
- /**
- * Tests that the query actor will be stopped when the MetricRegistry is shut down.
- */
- @Test
- public void testQueryActorShutdown() throws Exception {
- final FiniteDuration timeout = new FiniteDuration(10L, TimeUnit.SECONDS);
-
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-
- final ActorSystem actorSystem = AkkaUtils.createDefaultActorSystem();
-
- registry.startQueryService(actorSystem, null);
-
- ActorRef queryServiceActor = registry.getQueryService();
-
- registry.shutdown();
-
- try {
- Await.result(actorSystem.actorSelection(queryServiceActor.path()).resolveOne(timeout), timeout);
-
- fail("The query actor should be terminated resulting in a ActorNotFound exception.");
- } catch (ActorNotFound e) {
- // we expect the query actor to be shut down
- }
- }
-
- /**
- * Reporter that verifies that the configured delimiter is applied correctly when generating the metric identifier.
- */
- public static class TestReporter8 extends TestReporter {
- char expectedDelimiter;
- public static int numCorrectDelimitersForRegister = 0;
- public static int numCorrectDelimitersForUnregister = 0;
-
- @Override
- public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
- String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
- assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
- assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
- numCorrectDelimitersForRegister++;
- }
-
- @Override
- public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
- String expectedMetric = "A" + expectedDelimiter + "B" + expectedDelimiter + "C";
- assertEquals(expectedMetric, group.getMetricIdentifier(metricName, this));
- assertEquals(expectedMetric, group.getMetricIdentifier(metricName));
- numCorrectDelimitersForUnregister++;
- }
- }
-
- @Test
- public void testExceptionIsolation() throws Exception {
-
- Configuration config = new Configuration();
- config.setString(MetricOptions.REPORTERS_LIST, "test1,test2");
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test1." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, FailingReporter.class.getName());
- config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter7.class.getName());
-
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
-
- Counter metric = new SimpleCounter();
- registry.register(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
-
- assertEquals(metric, TestReporter7.addedMetric);
- assertEquals("counter", TestReporter7.addedMetricName);
-
- registry.unregister(metric, "counter", new MetricGroupTest.DummyAbstractMetricGroup(registry));
-
- assertEquals(metric, TestReporter7.removedMetric);
- assertEquals("counter", TestReporter7.removedMetricName);
-
- registry.shutdown();
- }
-
- /**
- * Reporter that throws an exception when it is notified of an added or removed metric.
- */
- protected static class FailingReporter extends TestReporter {
- @Override
- public void notifyOfAddedMetric(Metric metric, String metricName, MetricGroup group) {
- throw new RuntimeException();
- }
-
- @Override
- public void notifyOfRemovedMetric(Metric metric, String metricName, MetricGroup group) {
- throw new RuntimeException();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
new file mode 100644
index 0000000..1140e3d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/NoOpMetricRegistry.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Metric;
+import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
+import org.apache.flink.runtime.metrics.scope.ScopeFormats;
+
+/**
+ * Metric registry which does nothing and is intended for testing purposes.
+ */
+public class NoOpMetricRegistry implements MetricRegistry {
+
+ final char delimiter = ',';
+
+ final ScopeFormats scopeFormats = ScopeFormats.fromConfig(new Configuration());
+
+ @Override
+ public char getDelimiter() {
+ return delimiter;
+ }
+
+ @Override
+ public char getDelimiter(int index) {
+ return delimiter;
+ }
+
+ @Override
+ public int getNumberReporters() {
+ return 0;
+ }
+
+ @Override
+ public void register(Metric metric, String metricName, AbstractMetricGroup group) {}
+
+ @Override
+ public void unregister(Metric metric, String metricName, AbstractMetricGroup group) {}
+
+ @Override
+ public ScopeFormats getScopeFormats() {
+ return scopeFormats;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
index cea0928..31304e5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/TaskManagerMetricsTest.java
@@ -61,6 +61,9 @@ public class TaskManagerMetricsTest extends TestLogger {
HighAvailabilityServices highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
+ final MetricRegistryImpl metricRegistry = new MetricRegistryImpl(
+ MetricRegistryConfiguration.fromConfiguration(new Configuration()));
+
try {
actorSystem = AkkaUtils.createLocalActorSystem(new Configuration());
@@ -73,6 +76,7 @@ public class TaskManagerMetricsTest extends TestLogger {
TestingUtils.defaultExecutor(),
TestingUtils.defaultExecutor(),
highAvailabilityServices,
+ new NoOpMetricRegistry(),
Option.empty(),
JobManager.class,
MemoryArchivist.class)._1();
@@ -89,9 +93,9 @@ public class TaskManagerMetricsTest extends TestLogger {
TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(config);
TaskManagerServices taskManagerServices = TaskManagerServices.fromConfiguration(
- taskManagerServicesConfiguration, tmResourceID);
-
- final MetricRegistry tmRegistry = taskManagerServices.getMetricRegistry();
+ taskManagerServicesConfiguration,
+ tmResourceID,
+ metricRegistry);
// create the task manager
final Props tmProps = TaskManager.getTaskManagerProps(
@@ -103,7 +107,7 @@ public class TaskManagerMetricsTest extends TestLogger {
taskManagerServices.getIOManager(),
taskManagerServices.getNetworkEnvironment(),
highAvailabilityServices,
- tmRegistry);
+ taskManagerServices.getTaskManagerMetricGroup());
final ActorRef taskManager = actorSystem.actorOf(tmProps);
@@ -135,7 +139,7 @@ public class TaskManagerMetricsTest extends TestLogger {
}};
// verify that the registry was not shutdown due to the disconnect
- Assert.assertFalse(tmRegistry.isShutdown());
+ Assert.assertFalse(metricRegistry.isShutdown());
// shut down the actors and the actor system
actorSystem.shutdown();
@@ -148,6 +152,8 @@ public class TaskManagerMetricsTest extends TestLogger {
if (highAvailabilityServices != null) {
highAvailabilityServices.closeAndCleanupAllData();
}
+
+ metricRegistry.shutdown();
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 5c33ad6..55ba3a9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingHistogram;
import org.apache.flink.util.TestLogger;
@@ -82,7 +82,7 @@ public class MetricQueryServiceTest extends TestLogger {
}
};
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
index 648ee47..8d91b81 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/AbstractMetricGroupTest.java
@@ -25,8 +25,8 @@ import org.apache.flink.metrics.CharacterFilter;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
-import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.TestReporter;
@@ -45,7 +45,7 @@ public class AbstractMetricGroupTest {
*/
@Test
public void testGetAllVariables() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
AbstractMetricGroup group = new AbstractMetricGroup<AbstractMetricGroup<?>>(registry, new String[0], null) {
@Override
@@ -90,7 +90,7 @@ public class AbstractMetricGroupTest {
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter2.class.getName());
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test2." + ConfigConstants.METRICS_REPORTER_SCOPE_DELIMITER, "!");
- MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
try {
MetricGroup tmGroup = new TaskManagerMetricGroup(testRegistry, "host", "id");
tmGroup.counter("1");
@@ -180,7 +180,7 @@ public class AbstractMetricGroupTest {
public void testScopeGenerationWithoutReporters() {
Configuration config = new Configuration();
config.setString(MetricOptions.SCOPE_NAMING_TM, "A.B.C.D");
- MetricRegistry testRegistry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ MetricRegistryImpl testRegistry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
try {
TaskManagerMetricGroup group = new TaskManagerMetricGroup(testRegistry, "host", "id");
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
index 03341a6..05a72ac 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerGroupTest.java
@@ -22,8 +22,8 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.TestLogger;
@@ -45,7 +45,7 @@ public class JobManagerGroupTest extends TestLogger {
@Test
public void addAndRemoveJobs() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
final JobID jid1 = new JobID();
@@ -77,7 +77,7 @@ public class JobManagerGroupTest extends TestLogger {
@Test
public void testCloseClosesAll() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
final JobID jid1 = new JobID();
@@ -103,7 +103,7 @@ public class JobManagerGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "localhost");
assertArrayEquals(new String[]{"localhost", "jobmanager"}, group.getScopeComponents());
@@ -116,7 +116,7 @@ public class JobManagerGroupTest extends TestLogger {
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_JM, "constant.<host>.foo.<host>");
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
JobManagerMetricGroup group = new JobManagerMetricGroup(registry, "host");
@@ -128,7 +128,7 @@ public class JobManagerGroupTest extends TestLogger {
@Test
public void testCreateQueryServiceMetricInfo() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
QueryScopeInfo.JobManagerQueryScopeInfo info = jm.createQueryServiceMetricInfo(new DummyCharacterFilter());
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
index d734dfd..4373f80 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/JobManagerJobGroupTest.java
@@ -21,8 +21,8 @@ package org.apache.flink.runtime.metrics.groups;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.TestLogger;
@@ -39,7 +39,7 @@ public class JobManagerJobGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobManagerMetricGroup tmGroup = new JobManagerMetricGroup(registry, "theHostName");
JobMetricGroup jmGroup = new JobManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -60,7 +60,7 @@ public class JobManagerJobGroupTest extends TestLogger {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_JM, "abc");
cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "some-constant.<job_name>");
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
@@ -83,7 +83,7 @@ public class JobManagerJobGroupTest extends TestLogger {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_JM, "peter");
cfg.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "*.some-constant.<job_id>");
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
JobID jid = new JobID();
@@ -104,7 +104,7 @@ public class JobManagerJobGroupTest extends TestLogger {
@Test
public void testCreateQueryServiceMetricInfo() {
JobID jid = new JobID();
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobManagerMetricGroup jm = new JobManagerMetricGroup(registry, "host");
JobManagerJobMetricGroup jmj = new JobManagerJobMetricGroup(registry, jm, jid, "jobname");
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
index 56ce5fa..324bb73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupRegistrationTest.java
@@ -27,9 +27,10 @@ import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricGroup;
-import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.util.TestReporter;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
@@ -39,7 +40,7 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for the registration of groups and metrics on a {@link MetricGroup}.
*/
-public class MetricGroupRegistrationTest {
+public class MetricGroupRegistrationTest extends TestLogger {
/**
* Verifies that group methods instantiate the correct metric with the given name.
*/
@@ -49,7 +50,7 @@ public class MetricGroupRegistrationTest {
config.setString(MetricOptions.REPORTERS_LIST, "test");
config.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, TestReporter1.class.getName());
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
@@ -111,7 +112,7 @@ public class MetricGroupRegistrationTest {
public void testDuplicateGroupName() {
Configuration config = new Configuration();
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config));
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(config));
MetricGroup root = new TaskManagerMetricGroup(registry, "host", "id");
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
index 633dbed..94760e6 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/MetricGroupTest.java
@@ -26,6 +26,7 @@ import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
@@ -48,13 +49,13 @@ public class MetricGroupTest extends TestLogger {
private static final MetricRegistryConfiguration defaultMetricRegistryConfiguration = MetricRegistryConfiguration.defaultMetricRegistryConfiguration();
- private MetricRegistry registry;
+ private MetricRegistryImpl registry;
- private final MetricRegistry exceptionOnRegister = new ExceptionOnRegisterRegistry();
+ private final MetricRegistryImpl exceptionOnRegister = new ExceptionOnRegisterRegistry();
@Before
public void createRegistry() {
- this.registry = new MetricRegistry(defaultMetricRegistryConfiguration);
+ this.registry = new MetricRegistryImpl(defaultMetricRegistryConfiguration);
}
@After
@@ -134,7 +135,7 @@ public class MetricGroupTest extends TestLogger {
JobID jid = new JobID();
JobVertexID vid = new JobVertexID();
AbstractID eid = new AbstractID();
- MetricRegistry registry = new MetricRegistry(defaultMetricRegistryConfiguration);
+ MetricRegistryImpl registry = new MetricRegistryImpl(defaultMetricRegistryConfiguration);
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
@@ -156,7 +157,7 @@ public class MetricGroupTest extends TestLogger {
// ------------------------------------------------------------------------
- private static class ExceptionOnRegisterRegistry extends MetricRegistry {
+ private static class ExceptionOnRegisterRegistry extends MetricRegistryImpl {
public ExceptionOnRegisterRegistry() {
super(defaultMetricRegistryConfiguration);
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
index 4363a9d..820b73e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/OperatorGroupTest.java
@@ -24,8 +24,8 @@ import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.scope.ScopeFormat;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
@@ -47,7 +47,7 @@ public class OperatorGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -70,7 +70,7 @@ public class OperatorGroupTest extends TestLogger {
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_OPERATOR, "<tm_id>.<job_id>.<task_id>.<operator_name>.<operator_id>");
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
try {
String tmID = "test-tm-id";
JobID jid = new JobID();
@@ -97,7 +97,7 @@ public class OperatorGroupTest extends TestLogger {
@Test
public void testIOMetricGroupInstantiation() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tmGroup = new TaskManagerMetricGroup(registry, "theHostName", "test-tm-id");
TaskManagerJobMetricGroup jmGroup = new TaskManagerJobMetricGroup(registry, tmGroup, new JobID(), "myJobName");
@@ -114,7 +114,7 @@ public class OperatorGroupTest extends TestLogger {
@Test
public void testVariables() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
JobID jid = new JobID();
JobVertexID tid = new JobVertexID();
@@ -156,7 +156,7 @@ public class OperatorGroupTest extends TestLogger {
JobVertexID vid = new JobVertexID();
AbstractID eid = new AbstractID();
OperatorID oid = new OperatorID();
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
TaskManagerJobMetricGroup job = new TaskManagerJobMetricGroup(registry, tm, jid, "jobname");
TaskMetricGroup task = new TaskMetricGroup(registry, job, vid, eid, "taskName", 4, 5);
http://git-wip-us.apache.org/repos/asf/flink/blob/d45b9412/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
index bd85303..3272f73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/groups/TaskManagerGroupTest.java
@@ -23,8 +23,8 @@ import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.metrics.dump.QueryScopeInfo;
import org.apache.flink.runtime.metrics.util.DummyCharacterFilter;
import org.apache.flink.util.AbstractID;
@@ -50,7 +50,7 @@ public class TaskManagerGroupTest extends TestLogger {
@Test
public void addAndRemoveJobs() throws IOException {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
registry, "localhost", new AbstractID().toString());
@@ -112,7 +112,7 @@ public class TaskManagerGroupTest extends TestLogger {
@Test
public void testCloseClosesAll() throws IOException {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
final TaskManagerMetricGroup group = new TaskManagerMetricGroup(
registry, "localhost", new AbstractID().toString());
@@ -152,7 +152,7 @@ public class TaskManagerGroupTest extends TestLogger {
@Test
public void testGenerateScopeDefault() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "localhost", "id");
assertArrayEquals(new String[]{"localhost", "taskmanager", "id"}, group.getScopeComponents());
@@ -164,7 +164,7 @@ public class TaskManagerGroupTest extends TestLogger {
public void testGenerateScopeCustom() {
Configuration cfg = new Configuration();
cfg.setString(MetricOptions.SCOPE_NAMING_TM, "constant.<host>.foo.<host>");
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(cfg));
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.fromConfiguration(cfg));
TaskManagerMetricGroup group = new TaskManagerMetricGroup(registry, "host", "id");
assertArrayEquals(new String[]{"constant", "host", "foo", "host"}, group.getScopeComponents());
@@ -174,7 +174,7 @@ public class TaskManagerGroupTest extends TestLogger {
@Test
public void testCreateQueryServiceMetricInfo() {
- MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
+ MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
QueryScopeInfo.TaskManagerQueryScopeInfo info = tm.createQueryServiceMetricInfo(new DummyCharacterFilter());