You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2019/05/09 06:57:40 UTC
[flink] 05/05: [FLINK-12325][metrics] StatsDReporter properly
handles negative values
This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
commit 92b1a66a63aa10fc58c9f2ac4baca859437db40c
Author: Richard Deurwaarder <rd...@bol.com>
AuthorDate: Thu May 2 12:47:36 2019 +0200
[FLINK-12325][metrics] StatsDReporter properly handles negative values
---
.../flink/metrics/statsd/StatsDReporter.java | 59 ++++++++++++++++------
.../flink/metrics/statsd/StatsDReporterTest.java | 59 ++++++++++++++++++++++
2 files changed, 102 insertions(+), 16 deletions(-)
diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 527f9c1..29fbeb9 100644
--- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -127,14 +127,20 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
// ------------------------------------------------------------------------
private void reportCounter(final String name, final Counter counter) {
- send(name, String.valueOf(counter.getCount()));
+ send(name, counter.getCount());
}
private void reportGauge(final String name, final Gauge<?> gauge) {
Object value = gauge.getValue();
- if (value != null) {
- send(name, value.toString());
+ if (value == null) {
+ return;
}
+
+ if (value instanceof Number) {
+ send(numberIsNegative((Number) value), name, value.toString());
+ }
+
+ send(name, value.toString());
}
private void reportHistogram(final String name, final Histogram histogram) {
@@ -143,25 +149,25 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
HistogramStatistics statistics = histogram.getStatistics();
if (statistics != null) {
- send(prefix(name, "count"), String.valueOf(histogram.getCount()));
- send(prefix(name, "max"), String.valueOf(statistics.getMax()));
- send(prefix(name, "min"), String.valueOf(statistics.getMin()));
- send(prefix(name, "mean"), String.valueOf(statistics.getMean()));
- send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev()));
- send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5)));
- send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75)));
- send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95)));
- send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98)));
- send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99)));
- send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999)));
+ send(prefix(name, "count"), histogram.getCount());
+ send(prefix(name, "max"), statistics.getMax());
+ send(prefix(name, "min"), statistics.getMin());
+ send(prefix(name, "mean"), statistics.getMean());
+ send(prefix(name, "stddev"), statistics.getStdDev());
+ send(prefix(name, "p50"), statistics.getQuantile(0.5));
+ send(prefix(name, "p75"), statistics.getQuantile(0.75));
+ send(prefix(name, "p95"), statistics.getQuantile(0.95));
+ send(prefix(name, "p98"), statistics.getQuantile(0.98));
+ send(prefix(name, "p99"), statistics.getQuantile(0.99));
+ send(prefix(name, "p999"), statistics.getQuantile(0.999));
}
}
}
private void reportMeter(final String name, final Meter meter) {
if (meter != null) {
- send(prefix(name, "rate"), String.valueOf(meter.getRate()));
- send(prefix(name, "count"), String.valueOf(meter.getCount()));
+ send(prefix(name, "rate"), meter.getRate());
+ send(prefix(name, "count"), meter.getCount());
}
}
@@ -179,6 +185,23 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
}
}
+ private void send(String name, double value) {
+ send(numberIsNegative(value), name, String.valueOf(value));
+ }
+
+ private void send(String name, long value) {
+ send(value < 0, name, String.valueOf(value));
+ }
+
+ private void send(boolean resetToZero, String name, String value) {
+ if (resetToZero) {
+ // negative values are interpreted as reductions instead of absolute values
+ // reset value to 0 before applying reduction as a workaround
+ send(name, "0");
+ }
+ send(name, value);
+ }
+
private void send(final String name, final String value) {
try {
String formatted = String.format("%s:%s|g", name, value);
@@ -216,4 +239,8 @@ public class StatsDReporter extends AbstractReporter implements Scheduled {
return chars == null ? input : new String(chars, 0, pos);
}
+
+ private boolean numberIsNegative(Number input) {
+ return Double.compare(input.doubleValue(), 0) < 0;
+ }
}
diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 61bf07d..101326a 100644
--- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -152,6 +152,36 @@ public class StatsDReporterTest extends TestLogger {
testMetricAndAssert(new TestHistogram(), "metric", expectedLines);
}
+ @Test
+ public void testStatsDHistogramReportingOfNegativeValues() throws Exception {
+ TestHistogram histogram = new TestHistogram();
+ histogram.setCount(-101);
+ histogram.setMean(-104);
+ histogram.setMin(-107);
+ histogram.setMax(-106);
+ histogram.setStdDev(-105);
+
+ Set<String> expectedLines = new HashSet<>();
+ expectedLines.add("metric.count:0|g");
+ expectedLines.add("metric.count:-101|g");
+ expectedLines.add("metric.mean:0|g");
+ expectedLines.add("metric.mean:-104.0|g");
+ expectedLines.add("metric.min:0|g");
+ expectedLines.add("metric.min:-107|g");
+ expectedLines.add("metric.max:0|g");
+ expectedLines.add("metric.max:-106|g");
+ expectedLines.add("metric.stddev:0|g");
+ expectedLines.add("metric.stddev:-105.0|g");
+ expectedLines.add("metric.p75:0.75|g");
+ expectedLines.add("metric.p98:0.98|g");
+ expectedLines.add("metric.p99:0.99|g");
+ expectedLines.add("metric.p999:0.999|g");
+ expectedLines.add("metric.p95:0.95|g");
+ expectedLines.add("metric.p50:0.5|g");
+
+ testMetricAndAssert(histogram, "metric", expectedLines);
+ }
+
/**
* Tests that meters are properly reported via the StatsD reporter.
*/
@@ -164,6 +194,17 @@ public class StatsDReporterTest extends TestLogger {
testMetricAndAssert(new TestMeter(), "metric", expectedLines);
}
+ @Test
+ public void testStatsDMetersReportingOfNegativeValues() throws Exception {
+ Set<String> expectedLines = new HashSet<>();
+ expectedLines.add("metric.rate:0|g");
+ expectedLines.add("metric.rate:-5.3|g");
+ expectedLines.add("metric.count:0|g");
+ expectedLines.add("metric.count:-50|g");
+
+ testMetricAndAssert(new TestMeter(-50, -5.3), "metric", expectedLines);
+ }
+
/**
* Tests that counter are properly reported via the StatsD reporter.
*/
@@ -176,6 +217,15 @@ public class StatsDReporterTest extends TestLogger {
}
@Test
+ public void testStatsDCountersReportingOfNegativeValues() throws Exception {
+ Set<String> expectedLines = new HashSet<>();
+ expectedLines.add("metric:0|g");
+ expectedLines.add("metric:-51|g");
+
+ testMetricAndAssert(new TestCounter(-51), "metric", expectedLines);
+ }
+
+ @Test
public void testStatsDGaugesReporting() throws Exception {
Set<String> expectedLines = new HashSet<>(2);
expectedLines.add("metric:75|g");
@@ -183,6 +233,15 @@ public class StatsDReporterTest extends TestLogger {
testMetricAndAssert((Gauge) () -> 75, "metric", expectedLines);
}
+ @Test
+ public void testStatsDGaugesReportingOfNegativeValues() throws Exception {
+ Set<String> expectedLines = new HashSet<>();
+ expectedLines.add("metric:0|g");
+ expectedLines.add("metric:-12345|g");
+
+ testMetricAndAssert((Gauge) () -> -12345, "metric", expectedLines);
+ }
+
private void testMetricAndAssert(Metric metric, String metricName, Set<String> expectation) throws Exception {
StatsDReporter reporter = null;
DatagramSocketReceiver receiver = null;