You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/09 06:57:36 UTC
[flink] 01/05: [FLINK-12325][metrics] Refactor StatsD tests
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7b806e7eb75e40729f1565ecc65bb632fd4eaf28
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Thu May 2 12:02:48 2019 +0200
[FLINK-12325][metrics] Refactor StatsD tests
---
.../flink/metrics/statsd/StatsDReporterTest.java | 124 ++++++---------------
1 file changed, 35 insertions(+), 89 deletions(-)
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 68b90ce..a0b853f 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -25,8 +25,11 @@ import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Histogram;
import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.metrics.reporter.MetricReporter;
import org.apache.flink.metrics.util.TestMeter;
import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -132,73 +135,20 @@ public class StatsDReporterTest extends TestLogger {
*/
@Test
public void testStatsDHistogramReporting() throws Exception {
- MetricRegistryImpl registry = null;
- DatagramSocketReceiver receiver = null;
- Thread receiverThread = null;
- long timeout = 5000;
- long joinTimeout = 30000;
-
- String histogramName = "histogram";
-
- try {
- receiver = new DatagramSocketReceiver();
-
- receiverThread = new Thread(receiver);
-
- receiverThread.start();
-
- int port = receiver.getPort();
-
- MetricConfig config = new MetricConfig();
- config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
- config.setProperty("host", "localhost");
- config.setProperty("port", String.valueOf(port));
-
- registry = new MetricRegistryImpl(
- MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
- Collections.singletonList(ReporterSetup.forReporter("test", config, new StatsDReporter())));
-
- TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
-
- TestingHistogram histogram = new TestingHistogram();
-
- metricGroup.histogram(histogramName, histogram);
-
- receiver.waitUntilNumLines(11, timeout);
-
- Set<String> lines = receiver.getLines();
-
- String prefix = metricGroup.getMetricIdentifier(histogramName);
-
- Set<String> expectedLines = new HashSet<>();
-
- expectedLines.add(prefix + ".count:1|g");
- expectedLines.add(prefix + ".mean:3.0|g");
- expectedLines.add(prefix + ".min:6|g");
- expectedLines.add(prefix + ".max:5|g");
- expectedLines.add(prefix + ".stddev:4.0|g");
- expectedLines.add(prefix + ".p75:0.75|g");
- expectedLines.add(prefix + ".p98:0.98|g");
- expectedLines.add(prefix + ".p99:0.99|g");
- expectedLines.add(prefix + ".p999:0.999|g");
- expectedLines.add(prefix + ".p95:0.95|g");
- expectedLines.add(prefix + ".p50:0.5|g");
-
- assertEquals(expectedLines, lines);
-
- } finally {
- if (registry != null) {
- registry.shutdown().get();
- }
-
- if (receiver != null) {
- receiver.stop();
- }
-
- if (receiverThread != null) {
- receiverThread.join(joinTimeout);
- }
- }
+ Set<String> expectedLines = new HashSet<>(6);
+ expectedLines.add("metric.count:1|g");
+ expectedLines.add("metric.mean:3.0|g");
+ expectedLines.add("metric.min:6|g");
+ expectedLines.add("metric.max:5|g");
+ expectedLines.add("metric.stddev:4.0|g");
+ expectedLines.add("metric.p75:0.75|g");
+ expectedLines.add("metric.p98:0.98|g");
+ expectedLines.add("metric.p99:0.99|g");
+ expectedLines.add("metric.p999:0.999|g");
+ expectedLines.add("metric.p95:0.95|g");
+ expectedLines.add("metric.p50:0.5|g");
+
+ testMetricAndAssert(new TestingHistogram(), "metric", expectedLines);
}
/**
@@ -206,14 +156,20 @@ public class StatsDReporterTest extends TestLogger {
*/
@Test
public void testStatsDMetersReporting() throws Exception {
- MetricRegistryImpl registry = null;
+ Set<String> expectedLines = new HashSet<>(4);
+ expectedLines.add("metric.rate:5.0|g");
+ expectedLines.add("metric.count:100|g");
+
+ testMetricAndAssert(new TestMeter(), "metric", expectedLines);
+ }
+
+ private void testMetricAndAssert(Metric metric, String metricName, Set<String> expectation) throws Exception {
+ StatsDReporter reporter = null;
DatagramSocketReceiver receiver = null;
Thread receiverThread = null;
long timeout = 5000;
long joinTimeout = 30000;
- String meterName = "meter";
-
try {
receiver = new DatagramSocketReceiver();
@@ -224,32 +180,22 @@ public class StatsDReporterTest extends TestLogger {
int port = receiver.getPort();
MetricConfig config = new MetricConfig();
- config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS");
config.setProperty("host", "localhost");
config.setProperty("port", String.valueOf(port));
- registry = new MetricRegistryImpl(
- MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
- Collections.singletonList(ReporterSetup.forReporter("test", config, new StatsDReporter())));
- TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId");
- TestMeter meter = new TestMeter();
- metricGroup.meter(meterName, meter);
- String prefix = metricGroup.getMetricIdentifier(meterName);
-
- Set<String> expectedLines = new HashSet<>();
-
- expectedLines.add(prefix + ".rate:5.0|g");
- expectedLines.add(prefix + ".count:100|g");
-
- receiver.waitUntilNumLines(expectedLines.size(), timeout);
+ reporter = new StatsDReporter();
+ ReporterSetup.forReporter("test", config, reporter);
+ MetricGroup metricGroup = new UnregisteredMetricsGroup();
- Set<String> lines = receiver.getLines();
+ reporter.notifyOfAddedMetric(metric, metricName, metricGroup);
+ reporter.report();
- assertEquals(expectedLines, lines);
+ receiver.waitUntilNumLines(expectation.size(), timeout);
+ assertEquals(expectation, receiver.getLines());
} finally {
- if (registry != null) {
- registry.shutdown().get();
+ if (reporter != null) {
+ reporter.close();
}
if (receiver != null) {