You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/09 06:57:40 UTC

[flink] 05/05: [FLINK-12325][metrics] StatsDReporter properly handles negative values

This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92b1a66a63aa10fc58c9f2ac4baca859437db40c
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Thu May 2 12:47:36 2019 +0200

    [FLINK-12325][metrics] StatsDReporter properly handles negative values
---
 .../flink/metrics/statsd/StatsDReporter.java       | 59 ++++++++++++++++------
 .../flink/metrics/statsd/StatsDReporterTest.java   | 59 ++++++++++++++++++++++
 2 files changed, 102 insertions(+), 16 deletions(-)

diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 527f9c1..29fbeb9 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -127,14 +127,20 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 	// ------------------------------------------------------------------------
 
 	private void reportCounter(final String name, final Counter counter) {
-		send(name, String.valueOf(counter.getCount()));
+		send(name, counter.getCount());
 	}
 
 	private void reportGauge(final String name, final Gauge<?> gauge) {
 		Object value = gauge.getValue();
-		if (value != null) {
-			send(name, value.toString());
+		if (value == null) {
+			return;
 		}
+
+		if (value instanceof Number) {
+			send(numberIsNegative((Number) value), name, value.toString());
+		}
+
+		send(name, value.toString());
 	}
 
 	private void reportHistogram(final String name, final Histogram histogram) {
@@ -143,25 +149,25 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 			HistogramStatistics statistics = histogram.getStatistics();
 
 			if (statistics != null) {
-				send(prefix(name, "count"), String.valueOf(histogram.getCount()));
-				send(prefix(name, "max"), String.valueOf(statistics.getMax()));
-				send(prefix(name, "min"), String.valueOf(statistics.getMin()));
-				send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
-				send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
-				send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
-				send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
-				send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
-				send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
-				send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
-				send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
+				send(prefix(name, "count"), histogram.getCount());
+				send(prefix(name, "max"), statistics.getMax());
+				send(prefix(name, "min"), statistics.getMin());
+				send(prefix(name, "mean"), statistics.getMean());
+				send(prefix(name, "stddev"), statistics.getStdDev());
+				send(prefix(name, "p50"), statistics.getQuantile(0.5));
+				send(prefix(name, "p75"), statistics.getQuantile(0.75));
+				send(prefix(name, "p95"), statistics.getQuantile(0.95));
+				send(prefix(name, "p98"), statistics.getQuantile(0.98));
+				send(prefix(name, "p99"), statistics.getQuantile(0.99));
+				send(prefix(name, "p999"), statistics.getQuantile(0.999));
 			}
 		}
 	}
 
 	private void reportMeter(final String name, final Meter meter) {
 		if (meter != null) {
-			send(prefix(name, "rate"), String.valueOf(meter.getRate()));
-			send(prefix(name, "count"), String.valueOf(meter.getCount()));
+			send(prefix(name, "rate"), meter.getRate());
+			send(prefix(name, "count"), meter.getCount());
 		}
 	}
 
@@ -179,6 +185,23 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 		}
 	}
 
+	private void send(String name, double value) {
+		send(numberIsNegative(value), name, String.valueOf(value));
+	}
+
+	private void send(String name, long value) {
+		send(value < 0, name, String.valueOf(value));
+	}
+
+	private void send(boolean resetToZero, String name, String value) {
+		if (resetToZero) {
+			// negative values are interpreted as reductions instead of absolute values
+			// reset value to 0 before applying reduction as a workaround
+			send(name, "0");
+		}
+		send(name, value);
+	}
+
 	private void send(final String name, final String value) {
 		try {
 			String formatted = String.format("%s:%s|g", name, value);
@@ -216,4 +239,8 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
 
 		return chars == null ? input : new String(chars, 0, pos);
 	}
+
+	private boolean numberIsNegative(Number input) {
+		return Double.compare(input.doubleValue(), 0) < 0;
+	}
 }
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 61bf07d..101326a 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -152,6 +152,36 @@ public class StatsDReporterTest extends TestLogger {
 		testMetricAndAssert(new TestHistogram(), "metric", expectedLines);
 	}
 
+	@Test
+	public void testStatsDHistogramReportingOfNegativeValues() throws Exception {
+		TestHistogram histogram = new TestHistogram();
+		histogram.setCount(-101);
+		histogram.setMean(-104);
+		histogram.setMin(-107);
+		histogram.setMax(-106);
+		histogram.setStdDev(-105);
+
+		Set<String> expectedLines = new HashSet<>();
+		expectedLines.add("metric.count:0|g");
+		expectedLines.add("metric.count:-101|g");
+		expectedLines.add("metric.mean:0|g");
+		expectedLines.add("metric.mean:-104.0|g");
+		expectedLines.add("metric.min:0|g");
+		expectedLines.add("metric.min:-107|g");
+		expectedLines.add("metric.max:0|g");
+		expectedLines.add("metric.max:-106|g");
+		expectedLines.add("metric.stddev:0|g");
+		expectedLines.add("metric.stddev:-105.0|g");
+		expectedLines.add("metric.p75:0.75|g");
+		expectedLines.add("metric.p98:0.98|g");
+		expectedLines.add("metric.p99:0.99|g");
+		expectedLines.add("metric.p999:0.999|g");
+		expectedLines.add("metric.p95:0.95|g");
+		expectedLines.add("metric.p50:0.5|g");
+
+		testMetricAndAssert(histogram, "metric", expectedLines);
+	}
+
 	/**
 	 * Tests that meters are properly reported via the StatsD reporter.
 	 */
@@ -164,6 +194,17 @@ public class StatsDReporterTest extends TestLogger {
 		testMetricAndAssert(new TestMeter(), "metric", expectedLines);
 	}
 
+	@Test
+	public void testStatsDMetersReportingOfNegativeValues() throws Exception {
+		Set<String> expectedLines = new HashSet<>();
+		expectedLines.add("metric.rate:0|g");
+		expectedLines.add("metric.rate:-5.3|g");
+		expectedLines.add("metric.count:0|g");
+		expectedLines.add("metric.count:-50|g");
+
+		testMetricAndAssert(new TestMeter(-50, -5.3), "metric", expectedLines);
+	}
+
 	/**
 	 * Tests that counter are properly reported via the StatsD reporter.
 	 */
@@ -176,6 +217,15 @@ public class StatsDReporterTest extends TestLogger {
 	}
 
 	@Test
+	public void testStatsDCountersReportingOfNegativeValues() throws Exception {
+		Set<String> expectedLines = new HashSet<>();
+		expectedLines.add("metric:0|g");
+		expectedLines.add("metric:-51|g");
+
+		testMetricAndAssert(new TestCounter(-51), "metric", expectedLines);
+	}
+
+	@Test
 	public void testStatsDGaugesReporting() throws Exception {
 		Set<String> expectedLines = new HashSet<>(2);
 		expectedLines.add("metric:75|g");
@@ -183,6 +233,15 @@ public class StatsDReporterTest extends TestLogger {
 		testMetricAndAssert((Gauge) () -> 75, "metric", expectedLines);
 	}
 
+	@Test
+	public void testStatsDGaugesReportingOfNegativeValues() throws Exception {
+		Set<String> expectedLines = new HashSet<>();
+		expectedLines.add("metric:0|g");
+		expectedLines.add("metric:-12345|g");
+
+		testMetricAndAssert((Gauge) () -> -12345, "metric", expectedLines);
+	}
+
 	private void testMetricAndAssert(Metric metric, String metricName, Set<String> expectation) throws Exception {
 		StatsDReporter reporter = null;
 		DatagramSocketReceiver receiver = null;