You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/09 06:57:36 UTC

[flink] 01/05: [FLINK-12325][metrics] Refactor StatsD tests

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b806e7eb75e40729f1565ecc65bb632fd4eaf28
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Thu May 2 12:02:48 2019 +0200

    [FLINK-12325][metrics] Refactor StatsD tests
---
 .../flink/metrics/statsd/StatsDReporterTest.java   | 124 ++++++---------------
 1 file changed, 35 insertions(+), 89 deletions(-)

diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 68b90ce..a0b853f 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -25,8 +25,11 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -132,73 +135,20 @@ public class StatsDReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testStatsDHistogramReporting() throws Exception {
-		MetricRegistryImpl registry = null;
-		DatagramSocketReceiver receiver = null;
-		Thread receiverThread = null;
-		long timeout = 5000;
-		long joinTimeout = 30000;
-
-		String histogramName = "histogram";
-
-		try {
-			receiver = new DatagramSocketReceiver();
-
-			receiverThread = new Thread(receiver);
-
-			receiverThread.start();
-
-			int port = receiver.getPort();
-
-			MetricConfig config = new MetricConfig();
-			config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
-			config.setProperty("host", "localhost");
-			config.setProperty("port", String.valueOf(port));
-
-			registry = new MetricRegistryImpl(
-				MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
-				Collections.singletonList(ReporterSetup.forReporter("test", config, new StatsDReporter())));
-
-			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
-
-			TestingHistogram histogram = new TestingHistogram();
-
-			metricGroup.histogram(histogramName, histogram);
-
-			receiver.waitUntilNumLines(11, timeout);
-
-			Set<String> lines = receiver.getLines();
-
-			String prefix = metricGroup.getMetricIdentifier(histogramName);
-
-			Set<String> expectedLines = new HashSet<>();
-
-			expectedLines.add(prefix + ".count:1|g");
-			expectedLines.add(prefix + ".mean:3.0|g");
-			expectedLines.add(prefix + ".min:6|g");
-			expectedLines.add(prefix + ".max:5|g");
-			expectedLines.add(prefix + ".stddev:4.0|g");
-			expectedLines.add(prefix + ".p75:0.75|g");
-			expectedLines.add(prefix + ".p98:0.98|g");
-			expectedLines.add(prefix + ".p99:0.99|g");
-			expectedLines.add(prefix + ".p999:0.999|g");
-			expectedLines.add(prefix + ".p95:0.95|g");
-			expectedLines.add(prefix + ".p50:0.5|g");
-
-			assertEquals(expectedLines, lines);
-
-		} finally {
-			if (registry != null) {
-				registry.shutdown().get();
-			}
-
-			if (receiver != null) {
-				receiver.stop();
-			}
-
-			if (receiverThread != null) {
-				receiverThread.join(joinTimeout);
-			}
-		}
+		Set<String> expectedLines = new HashSet<>(6);
+		expectedLines.add("metric.count:1|g");
+		expectedLines.add("metric.mean:3.0|g");
+		expectedLines.add("metric.min:6|g");
+		expectedLines.add("metric.max:5|g");
+		expectedLines.add("metric.stddev:4.0|g");
+		expectedLines.add("metric.p75:0.75|g");
+		expectedLines.add("metric.p98:0.98|g");
+		expectedLines.add("metric.p99:0.99|g");
+		expectedLines.add("metric.p999:0.999|g");
+		expectedLines.add("metric.p95:0.95|g");
+		expectedLines.add("metric.p50:0.5|g");
+
+		testMetricAndAssert(new TestingHistogram(), "metric", expectedLines);
 	}
 
 	/**
@@ -206,14 +156,20 @@ public class StatsDReporterTest extends TestLogger {
 	 */
 	@Test
 	public void testStatsDMetersReporting() throws Exception {
-		MetricRegistryImpl registry = null;
+		Set<String> expectedLines = new HashSet<>(4);
+		expectedLines.add("metric.rate:5.0|g");
+		expectedLines.add("metric.count:100|g");
+
+		testMetricAndAssert(new TestMeter(), "metric", expectedLines);
+	}
+
+	private void testMetricAndAssert(Metric metric, String metricName, Set<String> expectation) throws Exception {
+		StatsDReporter reporter = null;
 		DatagramSocketReceiver receiver = null;
 		Thread receiverThread = null;
 		long timeout = 5000;
 		long joinTimeout = 30000;
 
-		String meterName = "meter";
-
 		try {
 			receiver = new DatagramSocketReceiver();
 
@@ -224,32 +180,22 @@ public class StatsDReporterTest extends TestLogger {
 			int port = receiver.getPort();
 
 			MetricConfig config = new MetricConfig();
-			config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
 			config.setProperty("host", "localhost");
 			config.setProperty("port", String.valueOf(port));
 
-			registry = new MetricRegistryImpl(
-				MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
-				Collections.singletonList(ReporterSetup.forReporter("test", config, new StatsDReporter())));
-			TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
-			TestMeter meter = new TestMeter();
-			metricGroup.meter(meterName, meter);
-			String prefix = metricGroup.getMetricIdentifier(meterName);
-
-			Set<String> expectedLines = new HashSet<>();
-
-			expectedLines.add(prefix + ".rate:5.0|g");
-			expectedLines.add(prefix + ".count:100|g");
-
-			receiver.waitUntilNumLines(expectedLines.size(), timeout);
+			reporter = new StatsDReporter();
+			ReporterSetup.forReporter("test", config, reporter);
+			MetricGroup metricGroup = new UnregisteredMetricsGroup();
 
-			Set<String> lines = receiver.getLines();
+			reporter.notifyOfAddedMetric(metric, metricName, metricGroup);
+			reporter.report();
 
-			assertEquals(expectedLines, lines);
+			receiver.waitUntilNumLines(expectation.size(), timeout);
+			assertEquals(expectation, receiver.getLines());
 
 		} finally {
-			if (registry != null) {
-				registry.shutdown().get();
+			if (reporter != null) {
+				reporter.close();
 			}
 
 			if (receiver != null) {