You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/11/07 16:04:57 UTC

[kafka] branch trunk updated: KAFKA-7560; PushHttpMetricsReporter should not convert metric value to double

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new df0faee  KAFKA-7560; PushHttpMetricsReporter should not convert metric value to double
df0faee is described below

commit df0faee09787ec4d14a1a5da98fe9bf4cd7f461c
Author: Dong Lin <li...@gmail.com>
AuthorDate: Wed Nov 7 08:04:29 2018 -0800

    KAFKA-7560; PushHttpMetricsReporter should not convert metric value to double
    
    Currently PushHttpMetricsReporter will convert value from KafkaMetric.metricValue() to double. This will not work for non-numerical metrics such as version in AppInfoParser whose value can be string. This has caused issue for PushHttpMetricsReporter which in turn caused system test kafkatest.tests.client.quota_test.QuotaTest.test_quota to fail.
    
    Since we allow metric value to be object, PushHttpMetricsReporter should also read metric value as object and pass it to the http server.
    
    Author: Dong Lin <li...@gmail.com>
    
    Reviewers: Manikumar Reddy O <ma...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #5886 from lindong28/KAFKA-7560
---
 .../kafka/tools/PushHttpMetricsReporter.java       |  9 ++---
 .../kafka/tools/PushHttpMetricsReporterTest.java   | 47 ++++++++++++++++------
 2 files changed, 38 insertions(+), 18 deletions(-)

diff --git a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
index 6adebf5..b33b75c 100644
--- a/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
+++ b/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java
@@ -174,8 +174,7 @@ public class PushHttpMetricsReporter implements MetricsReporter {
                 samples = new ArrayList<>(metrics.size());
                 for (KafkaMetric metric : metrics.values()) {
                     MetricName name = metric.metricName();
-                    double value = (Double) metric.metricValue();
-                    samples.add(new MetricValue(name.name(), name.group(), name.tags(), value));
+                    samples.add(new MetricValue(name.name(), name.group(), name.tags(), metric.metricValue()));
                 }
             }
 
@@ -212,9 +211,9 @@ public class PushHttpMetricsReporter implements MetricsReporter {
                 } else {
                     log.info("Finished reporting metrics with response code {}", responseCode);
                 }
-            } catch (Exception e) {
-                log.error("Error reporting metrics", e);
-                throw new KafkaException("Failed to report current metrics", e);
+            } catch (Throwable t) {
+                log.error("Error reporting metrics", t);
+                throw new KafkaException("Failed to report current metrics", t);
             } finally {
                 if (connection != null) {
                     connection.disconnect();
diff --git a/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
index 1cd3799..3a8458c 100644
--- a/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
+++ b/tools/src/test/java/org/apache/kafka/tools/PushHttpMetricsReporterTest.java
@@ -18,10 +18,11 @@ package org.apache.kafka.tools;
 
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import java.util.List;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.metrics.Gauge;
 import org.apache.kafka.common.metrics.KafkaMetric;
-import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -184,32 +185,40 @@ public class PushHttpMetricsReporterTest {
         KafkaMetric metric1 = new KafkaMetric(
                 new Object(),
                 new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
-                new ImmutableValue(1.0),
+                new ImmutableValue<>(1.0),
                 null,
                 time
         );
         KafkaMetric newMetric1 = new KafkaMetric(
                 new Object(),
                 new MetricName("name1", "group1", "desc1", Collections.singletonMap("key1", "value1")),
-                new ImmutableValue(-1.0),
+                new ImmutableValue<>(-1.0),
                 null,
                 time
         );
         KafkaMetric metric2 = new KafkaMetric(
                 new Object(),
                 new MetricName("name2", "group2", "desc2", Collections.singletonMap("key2", "value2")),
-                new ImmutableValue(2.0),
+                new ImmutableValue<>(2.0),
                 null,
                 time
         );
         KafkaMetric metric3 = new KafkaMetric(
                 new Object(),
                 new MetricName("name3", "group3", "desc3", Collections.singletonMap("key3", "value3")),
-                new ImmutableValue(3.0),
+                new ImmutableValue<>(3.0),
                 null,
                 time
         );
-        reporter.init(Arrays.asList(metric1, metric2));
+        KafkaMetric metric4 = new KafkaMetric(
+            new Object(),
+            new MetricName("name4", "group4", "desc4", Collections.singletonMap("key4", "value4")),
+            new ImmutableValue<>("value4"),
+            null,
+            time
+        );
+
+        reporter.init(Arrays.asList(metric1, metric2, metric4));
         reporter.metricChange(newMetric1); // added in init, modified
         reporter.metricChange(metric3); // added by change
         reporter.metricRemoval(metric2); // added in init, deleted by removal
@@ -222,9 +231,12 @@ public class PushHttpMetricsReporterTest {
         // We should be left with the modified version of metric1 and metric3
         JsonNode metrics = payload.get("metrics");
         assertTrue(metrics.isArray());
-        assertEquals(2, metrics.size());
+        assertEquals(3, metrics.size());
+        List<JsonNode> metricsList = Arrays.asList(metrics.get(0), metrics.get(1), metrics.get(2));
+        // Sort metrics based on name so that we can verify the value for each metric below
+        metricsList.sort((m1, m2) -> m1.get("name").textValue().compareTo(m2.get("name").textValue()));
 
-        JsonNode m1 = metrics.get(0);
+        JsonNode m1 = metricsList.get(0);
         assertEquals("name1", m1.get("name").textValue());
         assertEquals("group1", m1.get("group").textValue());
         JsonNode m1Tags = m1.get("tags");
@@ -233,7 +245,7 @@ public class PushHttpMetricsReporterTest {
         assertEquals("value1", m1Tags.get("key1").textValue());
         assertEquals(-1.0, m1.get("value").doubleValue(), 0.0);
 
-        JsonNode m3 = metrics.get(1);
+        JsonNode m3 = metricsList.get(1);
         assertEquals("name3", m3.get("name").textValue());
         assertEquals("group3", m3.get("group").textValue());
         JsonNode m3Tags = m3.get("tags");
@@ -242,6 +254,15 @@ public class PushHttpMetricsReporterTest {
         assertEquals("value3", m3Tags.get("key3").textValue());
         assertEquals(3.0, m3.get("value").doubleValue(), 0.0);
 
+        JsonNode m4 = metricsList.get(2);
+        assertEquals("name4", m4.get("name").textValue());
+        assertEquals("group4", m4.get("group").textValue());
+        JsonNode m4Tags = m4.get("tags");
+        assertTrue(m4Tags.isObject());
+        assertEquals(1, m4Tags.size());
+        assertEquals("value4", m4Tags.get("key4").textValue());
+        assertEquals("value4", m4.get("value").textValue());
+
         reporter.close();
 
         verifyAll();
@@ -318,15 +339,15 @@ public class PushHttpMetricsReporterTest {
         EasyMock.expect(executor.awaitTermination(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class))).andReturn(true);
     }
 
-    private static class ImmutableValue implements Measurable {
-        private final double value;
+    static class ImmutableValue<T> implements Gauge<T> {
+        private final T value;
 
-        public ImmutableValue(double value) {
+        public ImmutableValue(T value) {
             this.value = value;
         }
 
         @Override
-        public double measure(MetricConfig config, long now) {
+        public T value(MetricConfig config, long now) {
             return value;
         }
     }