You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/12/11 13:48:07 UTC

[flink] 04/04: [hotfix][metrics][tests] Refactor MetricQueryServiceTest

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8aee29829cc59627a6b40623d28ddfb8237e35c
Author: zentol <ch...@apache.org>
AuthorDate: Thu Dec 6 13:59:23 2018 +0100

    [hotfix][metrics][tests] Refactor MetricQueryServiceTest
    
    - use UnregisteredMetricGroups instead
    - simplify metrics
    - encapsulate TestActor fields
---
 .../metrics/dump/MetricQueryServiceTest.java       | 126 +++++++--------------
 1 file changed, 38 insertions(+), 88 deletions(-)

diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
index 5d538ed..673409c 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/dump/MetricQueryServiceTest.java
@@ -26,10 +26,10 @@ import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.Meter;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.util.TestHistogram;
+import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
-import org.apache.flink.runtime.metrics.MetricRegistryImpl;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.TestLogger;
 
 import akka.actor.ActorRef;
@@ -59,50 +59,22 @@ public class MetricQueryServiceTest extends TestLogger {
 			TestActor testActor = (TestActor) testActorRef.underlyingActor();
 
 			final Counter c = new SimpleCounter();
-			final Gauge<String> g = new Gauge<String>() {
-				@Override
-				public String getValue() {
-					return "Hello";
-				}
-			};
+			final Gauge<String> g = () -> "Hello";
 			final Histogram h = new TestHistogram();
-			final Meter m = new Meter() {
-
-				@Override
-				public void markEvent() {
-				}
+			final Meter m = new TestMeter();
 
-				@Override
-				public void markEvent(long n) {
-				}
-
-				@Override
-				public double getRate() {
-					return 5;
-				}
-
-				@Override
-				public long getCount() {
-					return 10;
-				}
-			};
-
-			MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-			final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+			final TaskManagerMetricGroup tm = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
 			MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
 			MetricQueryService.notifyOfAddedMetric(serviceActor, g, "gauge", tm);
 			MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
 			MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
 			serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
-			synchronized (testActor.lock) {
-				if (testActor.message == null) {
-					testActor.lock.wait();
-				}
-			}
 
-			MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
-			testActor.message = null;
+			testActor.waitForResult();
+
+			MetricDumpSerialization.MetricSerializationResult dump = testActor.getSerializationResult();
+
 			assertTrue(dump.serializedCounters.length > 0);
 			assertTrue(dump.serializedGauges.length > 0);
 			assertTrue(dump.serializedHistograms.length > 0);
@@ -114,14 +86,11 @@ public class MetricQueryServiceTest extends TestLogger {
 			MetricQueryService.notifyOfRemovedMetric(serviceActor, m);
 
 			serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
-			synchronized (testActor.lock) {
-				if (testActor.message == null) {
-					testActor.lock.wait();
-				}
-			}
 
-			MetricDumpSerialization.MetricSerializationResult emptyDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
-			testActor.message = null;
+			testActor.waitForResult();
+
+			MetricDumpSerialization.MetricSerializationResult emptyDump = testActor.getSerializationResult();
+
 			assertEquals(0, emptyDump.serializedCounters.length);
 			assertEquals(0, emptyDump.serializedGauges.length);
 			assertEquals(0, emptyDump.serializedHistograms.length);
@@ -140,31 +109,7 @@ public class MetricQueryServiceTest extends TestLogger {
 			TestActorRef testActorRef = TestActorRef.create(s, Props.create(TestActor.class));
 			TestActor testActor = (TestActor) testActorRef.underlyingActor();
 
-			final Counter c = new SimpleCounter();
-			final Histogram h = new TestHistogram();
-			final Meter m = new Meter() {
-
-				@Override
-				public void markEvent() {
-				}
-
-				@Override
-				public void markEvent(long n) {
-				}
-
-				@Override
-				public double getRate() {
-					return 5;
-				}
-
-				@Override
-				public long getCount() {
-					return 10;
-				}
-			};
-
-			MetricRegistryImpl registry = new MetricRegistryImpl(MetricRegistryConfiguration.defaultMetricRegistryConfiguration());
-			final TaskManagerMetricGroup tm = new TaskManagerMetricGroup(registry, "host", "id");
+			final TaskManagerMetricGroup tm = UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
 			final String gaugeValue = "Hello";
 			final long requiredGaugesToExceedLimit = sizeLimit / gaugeValue.length() + 1;
@@ -173,19 +118,15 @@ public class MetricQueryServiceTest extends TestLogger {
 				.collect(Collectors.toList());
 			gauges.forEach(gauge -> MetricQueryService.notifyOfAddedMetric(serviceActor, gauge.f1, gauge.f0, tm));
 
-			MetricQueryService.notifyOfAddedMetric(serviceActor, c, "counter", tm);
-			MetricQueryService.notifyOfAddedMetric(serviceActor, h, "histogram", tm);
-			MetricQueryService.notifyOfAddedMetric(serviceActor, m, "meter", tm);
+			MetricQueryService.notifyOfAddedMetric(serviceActor, new SimpleCounter(), "counter", tm);
+			MetricQueryService.notifyOfAddedMetric(serviceActor, new TestHistogram(), "histogram", tm);
+			MetricQueryService.notifyOfAddedMetric(serviceActor, new TestMeter(), "meter", tm);
 
 			serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
-			synchronized (testActor.lock) {
-				if (testActor.message == null) {
-					testActor.lock.wait();
-				}
-			}
+			testActor.waitForResult();
+
+			MetricDumpSerialization.MetricSerializationResult dump = testActor.getSerializationResult();
 
-			MetricDumpSerialization.MetricSerializationResult dump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
-			testActor.message = null;
 			assertTrue(dump.serializedCounters.length > 0);
 			assertEquals(1, dump.numCounters);
 			assertTrue(dump.serializedMeters.length > 0);
@@ -204,14 +145,9 @@ public class MetricQueryServiceTest extends TestLogger {
 			}
 
 			serviceActor.tell(MetricQueryService.getCreateDump(), testActorRef);
-			synchronized (testActor.lock) {
-				if (testActor.message == null) {
-					testActor.lock.wait();
-				}
-			}
+			testActor.waitForResult();
 
-			MetricDumpSerialization.MetricSerializationResult recoveredDump = (MetricDumpSerialization.MetricSerializationResult) testActor.message;
-			testActor.message = null;
+			MetricDumpSerialization.MetricSerializationResult recoveredDump = testActor.getSerializationResult();
 
 			assertTrue(recoveredDump.serializedCounters.length > 0);
 			assertEquals(1, recoveredDump.numCounters);
@@ -227,8 +163,8 @@ public class MetricQueryServiceTest extends TestLogger {
 	}
 
 	private static class TestActor extends UntypedActor {
-		public Object message;
-		public Object lock = new Object();
+		private Object message;
+		private final Object lock = new Object();
 
 		@Override
 		public void onReceive(Object message) throws Exception {
@@ -237,5 +173,19 @@ public class MetricQueryServiceTest extends TestLogger {
 				lock.notifyAll();
 			}
 		}
+
+		void waitForResult() throws InterruptedException {
+			synchronized (lock) {
+				if (message == null) {
+					lock.wait();
+				}
+			}
+		}
+
+		MetricDumpSerialization.MetricSerializationResult getSerializationResult() {
+			final MetricDumpSerialization.MetricSerializationResult result = (MetricDumpSerialization.MetricSerializationResult) message;
+			message = null;
+			return result;
+		}
 	}
 }