You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2017/12/22 20:26:00 UTC

[jira] [Commented] (KAFKA-3923) MetricReporter interface depends on final class KafkaMetric instead of Metric interface

    [ https://issues.apache.org/jira/browse/KAFKA-3923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16301918#comment-16301918 ] 

ASF GitHub Bot commented on KAFKA-3923:
---------------------------------------

guozhangwang closed pull request #1579: KAFKA-3923: Make KafkaMetric not final, update JmxReporter and unit tests
URL: https://github.com/apache/kafka/pull/1579
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/clients/src/main/java/org/apache/kafka/common/Metric.java b/clients/src/main/java/org/apache/kafka/common/Metric.java
index d4ef77e36bb..611c9310f80 100644
--- a/clients/src/main/java/org/apache/kafka/common/Metric.java
+++ b/clients/src/main/java/org/apache/kafka/common/Metric.java
@@ -17,18 +17,17 @@
 package org.apache.kafka.common;
 
 /**
- * A numerical metric tracked for monitoring purposes
+ * A numerical metric tracked for monitoring purposes.
  */
 public interface Metric {
 
     /**
-     * A name for this metric
+     * A name for this metric.
      */
-    public MetricName metricName();
+    MetricName metricName();
 
     /**
-     * The value of the metric
+     * The value of the metric.
      */
-    public double value();
-
+    double value();
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
index 6872049dae7..962fe7e30f2 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
@@ -16,6 +16,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 
 import javax.management.Attribute;
 import javax.management.AttributeList;
@@ -37,14 +38,14 @@
 import org.slf4j.LoggerFactory;
 
 /**
- * Register metrics in JMX as dynamic mbeans based on the metric names
+ * Register metrics in JMX as dynamic mbeans based on the metric names.
  */
 public class JmxReporter implements MetricsReporter {
 
     private static final Logger log = LoggerFactory.getLogger(JmxReporter.class);
     private static final Object LOCK = new Object();
-    private String prefix;
-    private final Map<String, KafkaMbean> mbeans = new HashMap<String, KafkaMbean>();
+    private final String prefix;
+    private final Map<ObjectName, KafkaMbean> mbeans = new HashMap<>();
 
     public JmxReporter() {
         this("");
@@ -63,17 +64,25 @@ public void configure(Map<String, ?> configs) {}
     @Override
     public void init(List<KafkaMetric> metrics) {
         synchronized (LOCK) {
-            for (KafkaMetric metric : metrics)
-                addAttribute(metric);
-            for (KafkaMbean mbean : mbeans.values())
+            for (KafkaMetric metric : metrics) {
+                final KafkaMbean mbean = addAttribute(metric);
+                if (!this.mbeans.containsKey(mbean.name())) {
+                    this.mbeans.put(mbean.name(), mbean);
+                }
+            }
+            for (KafkaMbean mbean : this.mbeans.values()) {
                 reregister(mbean);
+            }
         }
     }
 
     @Override
     public void metricChange(KafkaMetric metric) {
         synchronized (LOCK) {
-            KafkaMbean mbean = addAttribute(metric);
+            final KafkaMbean mbean = addAttribute(metric);
+            if (!this.mbeans.containsKey(mbean.name())) {
+                this.mbeans.put(mbean.name(), mbean);
+            }
             reregister(mbean);
         }
     }
@@ -81,73 +90,64 @@ public void metricChange(KafkaMetric metric) {
     @Override
     public void metricRemoval(KafkaMetric metric) {
         synchronized (LOCK) {
-            KafkaMbean mbean = removeAttribute(metric);
-            if (mbean != null) {
-                if (mbean.metrics.isEmpty())
-                    unregister(mbean);
-                else
-                    reregister(mbean);
+            final KafkaMbean mbean = removeAttribute(metric);
+            if (mbean.isEmpty()) {
+                unregister(mbean);
+                this.mbeans.remove(mbean.name());
+            } else {
+                reregister(mbean);
             }
         }
     }
 
     private KafkaMbean removeAttribute(KafkaMetric metric) {
-        MetricName metricName = metric.metricName();
-        String mBeanName = getMBeanName(metricName);
-        KafkaMbean mbean = this.mbeans.get(mBeanName);
-        if (mbean != null)
+        final MetricName metricName = metric.metricName();
+        final KafkaMbean mbean = getMBean(metricName);
+        if (!mbean.isEmpty()) {
             mbean.removeAttribute(metricName.name());
+        }
         return mbean;
     }
 
     private KafkaMbean addAttribute(KafkaMetric metric) {
-        try {
-            MetricName metricName = metric.metricName();
-            String mBeanName = getMBeanName(metricName);
-            if (!this.mbeans.containsKey(mBeanName))
-                mbeans.put(mBeanName, new KafkaMbean(mBeanName));
-            KafkaMbean mbean = this.mbeans.get(mBeanName);
-            mbean.setAttribute(metricName.name(), metric);
-            return mbean;
-        } catch (JMException e) {
-            throw new KafkaException("Error creating mbean attribute for metricName :" + metric.metricName(), e);
-        }
+        final MetricName metricName = metric.metricName();
+        final KafkaMbean mbean = getMBean(metricName);
+        mbean.setAttribute(metricName.name(), metric);
+        return mbean;
     }
 
     /**
-     * @param metricName
-     * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
+     * Get {@link KafkaMbean} instance if metric with the given {@link MetricName} is already registered or create a new instance otherwise.
+     *
+     * @param metricName identifier for {@link KafkaMetric}
+     * @return standard JMX MBean for the given {@link MetricName}
      */
-    private String getMBeanName(MetricName metricName) {
-        StringBuilder mBeanName = new StringBuilder();
-        mBeanName.append(prefix);
-        mBeanName.append(":type=");
-        mBeanName.append(metricName.group());
-        for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
-            if (entry.getKey().length() <= 0 || entry.getValue().length() <= 0)
-                continue;
-            mBeanName.append(",");
-            mBeanName.append(entry.getKey());
-            mBeanName.append("=");
-            mBeanName.append(entry.getValue());
-        }
-        return mBeanName.toString();
+    protected KafkaMbean getMBean(MetricName metricName) {
+        final ObjectName name = KafkaMbean.getObjectName(metricName, this.prefix);
+        KafkaMbean mbean = this.mbeans.get(name);
+        if (mbean == null) {
+            return KafkaMbean.of(name);
+        }
+        return mbean;
     }
 
     public void close() {
         synchronized (LOCK) {
-            for (KafkaMbean mbean : this.mbeans.values())
+            for (KafkaMbean mbean : this.mbeans.values()) {
                 unregister(mbean);
+            }
+            this.mbeans.clear();
         }
     }
 
     private void unregister(KafkaMbean mbean) {
-        MBeanServer server = ManagementFactory.getPlatformMBeanServer();
+        final MBeanServer server = ManagementFactory.getPlatformMBeanServer();
         try {
-            if (server.isRegistered(mbean.name()))
+            if (server.isRegistered(mbean.name())) {
                 server.unregisterMBean(mbean.name());
+            }
         } catch (JMException e) {
-            throw new KafkaException("Error unregistering mbean", e);
+            throw new KafkaException("Error de-registering mbean " + mbean.name(), e);
         }
     }
 
@@ -160,17 +160,30 @@ private void reregister(KafkaMbean mbean) {
         }
     }
 
-    private static class KafkaMbean implements DynamicMBean {
+    public int mbeansQty() {
+        return this.mbeans.size();
+    }
+
+    protected static class KafkaMbean implements DynamicMBean {
+
         private final ObjectName objectName;
         private final Map<String, KafkaMetric> metrics;
 
-        public KafkaMbean(String mbeanName) throws MalformedObjectNameException {
-            this.metrics = new HashMap<String, KafkaMetric>();
-            this.objectName = new ObjectName(mbeanName);
+        public static KafkaMbean of(ObjectName objectName) {
+            try {
+                return new KafkaMbean(objectName);
+            } catch (MalformedObjectNameException e) {
+                throw new KafkaException("Error creating mbean with " + objectName, e);
+            }
+        }
+
+        private KafkaMbean(ObjectName objectName) throws MalformedObjectNameException {
+            this.metrics = new HashMap<>();
+            this.objectName = objectName;
         }
 
         public ObjectName name() {
-            return objectName;
+            return this.objectName;
         }
 
         public void setAttribute(String name, KafkaMetric metric) {
@@ -179,18 +192,20 @@ public void setAttribute(String name, KafkaMetric metric) {
 
         @Override
         public Object getAttribute(String name) throws AttributeNotFoundException, MBeanException, ReflectionException {
-            if (this.metrics.containsKey(name))
+            if (this.metrics.containsKey(name)) {
                 return this.metrics.get(name).value();
-            else
+            } else {
                 throw new AttributeNotFoundException("Could not find attribute " + name);
+            }
         }
 
         @Override
         public AttributeList getAttributes(String[] names) {
             try {
                 AttributeList list = new AttributeList();
-                for (String name : names)
+                for (String name : names) {
                     list.add(new Attribute(name, getAttribute(name)));
+                }
                 return list;
             } catch (Exception e) {
                 log.error("Error getting JMX attribute: ", e);
@@ -204,18 +219,10 @@ public KafkaMetric removeAttribute(String name) {
 
         @Override
         public MBeanInfo getMBeanInfo() {
-            MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[metrics.size()];
+            MBeanAttributeInfo[] attrs = new MBeanAttributeInfo[this.metrics.size()];
             int i = 0;
             for (Map.Entry<String, KafkaMetric> entry : this.metrics.entrySet()) {
-                String attribute = entry.getKey();
-                KafkaMetric metric = entry.getValue();
-                attrs[i] = new MBeanAttributeInfo(attribute,
-                                                  double.class.getName(),
-                                                  metric.metricName().description(),
-                                                  true,
-                                                  false,
-                                                  false);
-                i += 1;
+                attrs[i++] = new MBeanAttributeInfo(entry.getKey(), double.class.getName(), entry.getValue().metricName().description(), true, false, false);
             }
             return new MBeanInfo(this.getClass().getName(), "", attrs, null, null, null);
         }
@@ -238,6 +245,40 @@ public AttributeList setAttributes(AttributeList list) {
             throw new UnsupportedOperationException("Set not allowed.");
         }
 
-    }
+        public boolean isEmpty() {
+            return this.metrics.isEmpty();
+        }
+
+        public int attributesQty() {
+            return this.metrics.size();
+        }
 
+        /**
+         * Calculate name of {@link KafkaMbean} instance, using the given {@link MetricName}.
+         *
+         * @param metricName identifier for {@link KafkaMetric}
+         * @return standard JMX MBean name in the following format domainName:type=metricType,key1=val1,key2=val2
+         */
+        protected static ObjectName getObjectName(MetricName metricName, String prefix) {
+            try {
+                Objects.requireNonNull(metricName, "Parameter 'metricName' is mandatory");
+                StringBuilder mBeanName = new StringBuilder();
+                mBeanName.append(prefix);
+                mBeanName.append(":type=");
+                mBeanName.append(metricName.group());
+                for (Map.Entry<String, String> entry : metricName.tags().entrySet()) {
+                    if (entry.getKey().isEmpty() || entry.getValue().isEmpty()) {
+                        continue;
+                    }
+                    mBeanName.append(",");
+                    mBeanName.append(entry.getKey());
+                    mBeanName.append("=");
+                    mBeanName.append(entry.getValue());
+                }
+                return ObjectName.getInstance(mBeanName.toString());
+            } catch (MalformedObjectNameException e) {
+                throw new KafkaException("Error creating mbean for " + metricName, e);
+            }
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
index 86014e53bb8..09abfaada3c 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/KafkaMetric.java
@@ -20,7 +20,39 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.utils.Time;
 
-public final class KafkaMetric implements Metric {
+/**
+ * Main implementation for {@link Metric} interface, intended for monitoring purposes.
+ * <p>
+ * As no public constructor is available, instances of this class should be created with {@link Metrics} registry.
+ * <p>
+ * Creating KafkaMetric manually can be useful for implementing unit tests, e.g.:
+ *
+ * <pre>
+ * // set up metrics:
+ * Metrics metrics = new Metrics(); // this is the global repository of metrics and sensors
+ * MetricName metricName = new MetricName(&quot;Dummy Name&quot;, &quot;Dummy Group&quot;, &quot;Dummy Description&quot;, Collections.emptyMap());
+ * metrics.add(metricName, new Avg());
+ *
+ * Collection&lt;KafkaMetric&gt; metricObjects = metrics.metrics().values(); // constructed KafkaMetric objects
+ * </pre>
+ *
+ * Also this class can be mocked or extended whenever is necessary, e.g.:
+ *
+ * <pre>
+ * private static class MockKafkaMetric extends KafkaMetric {
+ *
+ *     private MockKafkaMetric(Object lock, MetricName metricName, Measurable measurable, MetricConfig config, Time time) {
+ *         super(lock, metricName, measurable, config, time);
+ *     }
+ *
+ *     private static MockKafkaMetric of(String name, String group, Measurable measurable) {
+ *         final MetricName metricName = new MetricName(name, group, &quot;&quot;, Collections.&lt;String, String&gt;emptyMap());
+ *         return new MockKafkaMetric(new Object(), metricName, measurable, null, Time.SYSTEM);
+ *     }
+ * }
+ * </pre>
+ */
+public class KafkaMetric implements Metric {
 
     private MetricName metricName;
     private final Object lock;
@@ -49,7 +81,7 @@ public MetricName metricName() {
     @Override
     public double value() {
         synchronized (this.lock) {
-            return value(time.milliseconds());
+            return value(this.time.milliseconds());
         }
     }
 
@@ -58,11 +90,11 @@ public Measurable measurable() {
     }
 
     double value(long timeMs) {
-        return this.measurable.measure(config, timeMs);
+        return this.measurable.measure(this.config, timeMs);
     }
 
     public void config(MetricConfig config) {
-        synchronized (lock) {
+        synchronized (this.lock) {
             this.config = config;
         }
     }
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
index ab75813b2fa..8eda4901834 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/MetricsReporter.java
@@ -27,23 +27,22 @@
      * This is called when the reporter is first registered to initially register all existing metrics
      * @param metrics All currently existing metrics
      */
-    public void init(List<KafkaMetric> metrics);
+    void init(List<KafkaMetric> metrics);
 
     /**
      * This is called whenever a metric is updated or added
      * @param metric
      */
-    public void metricChange(KafkaMetric metric);
+    void metricChange(KafkaMetric metric);
 
     /**
      * This is called whenever a metric is removed
      * @param metric
      */
-    public void metricRemoval(KafkaMetric metric);
+    void metricRemoval(KafkaMetric metric);
 
     /**
      * Called when the metrics repository is closed.
      */
-    public void close();
-
+    void close();
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
index d483ef0591d..fc31f9458b8 100644
--- a/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/config/AbstractConfigTest.java
@@ -15,9 +15,9 @@
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
-import org.apache.kafka.common.metrics.FakeMetricsReporter;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.test.MockMetricsReporter;
 import org.junit.Test;
 
 import java.util.Arrays;
@@ -36,12 +36,12 @@
     @Test
     public void testConfiguredInstances() {
         testValidInputs("");
-        testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter");
-        testValidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter, org.apache.kafka.common.metrics.FakeMetricsReporter");
+        testValidInputs("org.apache.kafka.test.MockMetricsReporter");
+        testValidInputs("org.apache.kafka.test.MockMetricsReporter, org.apache.kafka.test.MockMetricsReporter");
         testInvalidInputs(",");
         testInvalidInputs("org.apache.kafka.clients.producer.unknown-metrics-reporter");
         testInvalidInputs("test1,test2");
-        testInvalidInputs("org.apache.kafka.common.metrics.FakeMetricsReporter,");
+        testInvalidInputs("org.apache.kafka.test.MockMetricsReporter,");
     }
 
     @Test
@@ -64,13 +64,14 @@ public void testOriginalsWithPrefix() {
     @Test
     public void testUnused() {
         Properties props = new Properties();
-        String configValue = "org.apache.kafka.common.config.AbstractConfigTest$ConfiguredFakeMetricsReporter";
+        String configValue = ConfiguredMockMetricsReporter.class.getName();
+        assertEquals("org.apache.kafka.common.config.AbstractConfigTest$ConfiguredMockMetricsReporter", configValue);
         props.put(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, configValue);
-        props.put(FakeMetricsReporterConfig.EXTRA_CONFIG, "my_value");
+        props.put(MockMetricsReporterConfig.EXTRA_CONFIG, "my_value");
         TestConfig config = new TestConfig(props);
 
         assertTrue("metric.extra_config should be marked unused before getConfiguredInstances is called",
-            config.unused().contains(FakeMetricsReporterConfig.EXTRA_CONFIG));
+            config.unused().contains(MockMetricsReporterConfig.EXTRA_CONFIG));
 
         config.getConfiguredInstances(TestConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class);
         assertTrue("All defined configurations should be marked as used", config.unused().isEmpty());
@@ -167,9 +168,9 @@ public RestrictedClassLoader() {
     }
 
     private static class ClassTestConfig extends AbstractConfig {
-        static final Class<?> DEFAULT_CLASS = FakeMetricsReporter.class;
+        static final Class<?> DEFAULT_CLASS = MockMetricsReporter.class;
         static final Class<?> VISIBLE_CLASS = JmxReporter.class;
-        static final Class<?> RESTRICTED_CLASS = ConfiguredFakeMetricsReporter.class;
+        static final Class<?> RESTRICTED_CLASS = ConfiguredMockMetricsReporter.class;
 
         private static final ConfigDef CONFIG;
         static {
@@ -233,17 +234,17 @@ public TestConfig(Map<?, ?> props) {
         }
     }
 
-    public static class ConfiguredFakeMetricsReporter extends FakeMetricsReporter {
+    public static class ConfiguredMockMetricsReporter extends MockMetricsReporter {
         @Override
         public void configure(Map<String, ?> configs) {
-            FakeMetricsReporterConfig config = new FakeMetricsReporterConfig(configs);
+            MockMetricsReporterConfig config = new MockMetricsReporterConfig(configs);
 
             // Calling getString() should have the side effect of marking that config as used.
-            config.getString(FakeMetricsReporterConfig.EXTRA_CONFIG);
+            config.getString(MockMetricsReporterConfig.EXTRA_CONFIG);
         }
     }
 
-    public static class FakeMetricsReporterConfig extends AbstractConfig {
+    public static class MockMetricsReporterConfig extends AbstractConfig {
 
         public static final String EXTRA_CONFIG = "metric.extra_config";
         private static final String EXTRA_CONFIG_DOC = "An extraneous configuration string.";
@@ -252,7 +253,7 @@ public void configure(Map<String, ?> configs) {
                 ConfigDef.Importance.LOW, EXTRA_CONFIG_DOC);
 
 
-        public FakeMetricsReporterConfig(Map<?, ?> props) {
+        public MockMetricsReporterConfig(Map<?, ?> props) {
             super(CONFIG, props);
         }
     }
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java b/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
deleted file mode 100644
index d5dd9b8b563..00000000000
--- a/clients/src/test/java/org/apache/kafka/common/metrics/FakeMetricsReporter.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
- * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
- * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
- * License. You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- */
-package org.apache.kafka.common.metrics;
-
-import java.util.List;
-import java.util.Map;
-
-public class FakeMetricsReporter implements MetricsReporter {
-
-    @Override
-    public void configure(Map<String, ?> configs) {}
-
-    @Override
-    public void init(List<KafkaMetric> metrics) {}
-
-    @Override
-    public void metricChange(KafkaMetric metric) {}
-
-    @Override
-    public void metricRemoval(KafkaMetric metric) {}
-
-    @Override
-    public void close() {}
-
-}
diff --git a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
index e07e646334e..c1c893738b6 100644
--- a/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/metrics/JmxReporterTest.java
@@ -16,25 +16,59 @@
  */
 package org.apache.kafka.common.metrics;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Total;
+import org.hamcrest.CoreMatchers;
 import org.junit.Test;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
 public class JmxReporterTest {
 
     @Test
     public void testJmxRegistration() throws Exception {
-        Metrics metrics = new Metrics();
-        try {
-            metrics.addReporter(new JmxReporter());
+        final JmxReporter reporter = new JmxReporter();
+        MetricName name;
+        try (Metrics metrics = new Metrics()) {
+            assertEquals(0, reporter.mbeansQty());
+            metrics.addReporter(reporter);
+            assertEquals(1, reporter.mbeansQty());
             Sensor sensor = metrics.sensor("kafka.requests");
-            sensor.add(metrics.metricName("pack.bean1.avg", "grp1"), new Avg());
-            sensor.add(metrics.metricName("pack.bean2.total", "grp2"), new Total());
+            name = metrics.metricName("pack.bean1.avg", "grp1");
+            sensor.add(name, new Avg());
+            assertEquals(2, reporter.mbeansQty());
+            assertEquals(1, reporter.getMBean(name).attributesQty());
+            name = metrics.metricName("pack.bean2.total", "grp2");
+            sensor.add(name, new Total());
+            assertEquals(3, reporter.mbeansQty());
+            assertEquals(1, reporter.getMBean(name).attributesQty());
             Sensor sensor2 = metrics.sensor("kafka.blah");
-            sensor2.add(metrics.metricName("pack.bean1.some", "grp1"), new Total());
-            sensor2.add(metrics.metricName("pack.bean2.some", "grp1"), new Total());
-        } finally {
-            metrics.close();
+            name = metrics.metricName("pack.bean1.some", "grp1");
+            sensor2.add(name, new Total());
+            assertEquals(3, reporter.mbeansQty());
+            assertEquals(2, reporter.getMBean(name).attributesQty());
+            name = metrics.metricName("pack.bean2.some", "grp1");
+            sensor2.add(name, new Total());
+            assertEquals(3, reporter.mbeansQty());
+            assertEquals(3, reporter.getMBean(name).attributesQty());
+            metrics.removeSensor(sensor2.name());
+            assertEquals(3, reporter.mbeansQty());
+            assertEquals(1, reporter.getMBean(name).attributesQty());
+        }
+        assertEquals(0, reporter.mbeansQty());
+        assertEquals(0, reporter.getMBean(name).attributesQty());
+    }
+
+    @Test
+    public void testGetMBeanWithNull() {
+        final JmxReporter reporter = new JmxReporter();
+        try {
+            reporter.getMBean(null);
+        } catch (RuntimeException e) {
+            assertThat(e, CoreMatchers.instanceOf(NullPointerException.class));
+            assertThat(e.getMessage(), CoreMatchers.is("Parameter 'metricName' is mandatory"));
         }
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 6dc17820f48..201ad61a90c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -57,6 +57,8 @@
 import org.junit.runners.Parameterized;
 import org.junit.runners.Parameterized.Parameter;
 import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -77,6 +79,8 @@
 
 @RunWith(Parameterized.class)
 public class QueryableStateIntegrationTest {
+    private static final Logger log = LoggerFactory.getLogger(QueryableStateIntegrationTest.class);
+
     private static final int NUM_BROKERS = 1;
     @ClassRule
     public static final EmbeddedKafkaCluster CLUSTER =
@@ -93,6 +97,7 @@
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(2, TimeUnit.DAYS);
     private static final int NUM_PARTITIONS = 2;
     private static final int NUM_REPLICAS = NUM_BROKERS;
+    private static final int MAX_WAIT_MS = Integer.getInteger("kafka.test.max.wait.ms", 60000);
     private Properties streamsConfiguration;
     private List<String> inputValues;
     private Set<String> inputValuesKeys;
@@ -281,16 +286,16 @@ public boolean conditionMet() {
                         final ReadOnlyKeyValueStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
                         return store != null && store.get(key) != null;
                     } catch (final IllegalStateException e) {
-                        // Kafka Streams instance may have closed but rebalance hasn't happened
+                        log.debug("Kafka Streams instance may have been closed but re-balance hasn't happened", e);
                         return false;
                     } catch (final InvalidStateStoreException e) {
-                        // there must have been at least one rebalance state
+                        log.debug("The state store may have been closed already due to consecutive re-balances", e);
                         assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
                         return false;
                     }
 
                 }
-            }, 30000, "waiting for metadata, store and value to be non null");
+            }, MAX_WAIT_MS, "waiting for metadata, store and value to be non null");
         }
     }
 
@@ -313,16 +318,16 @@ public boolean conditionMet() {
                         final ReadOnlyWindowStore<String, Long> store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
                         return store != null && store.fetch(key, from, to) != null;
                     } catch (final IllegalStateException e) {
-                        // Kafka Streams instance may have closed but rebalance hasn't happened
+                        log.debug("Kafka Streams instance may have been closed but re-balance hasn't happened", e);
                         return false;
                     } catch (InvalidStateStoreException e) {
-                        // there must have been at least one rebalance state
+                        log.debug("The state store may have been closed already due to consecutive re-balances", e);
                         assertTrue(stateListenerStub.mapStates.get(KafkaStreams.State.REBALANCING) >= 1);
                         return false;
                     }
 
                 }
-            }, 30000, "waiting for metadata, store and value to be non null");
+            }, MAX_WAIT_MS, "waiting for metadata, store and value to be non null");
         }
     }
 
@@ -474,7 +479,6 @@ public void shouldBeAbleToQueryState() throws Exception {
             myCount);
 
         verifyRangeAndAll(expectedCount, myCount);
-
     }
 
     @Test
@@ -498,7 +502,6 @@ public void shouldNotMakeStoreAvailableUntilAllStoresAvailable() throws Exceptio
                         new Properties()),
                 mockTime);
 
-        final int maxWaitMs = 30000;
         TestUtils.waitForCondition(new TestCondition() {
             @Override
             public boolean conditionMet() {
@@ -509,7 +512,7 @@ public boolean conditionMet() {
                     return false;
                 }
             }
-        }, maxWaitMs, "waiting for store " + storeName);
+        }, MAX_WAIT_MS, "waiting for store " + storeName);
 
         final ReadOnlyKeyValueStore<String, Long> store = kafkaStreams.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
 
@@ -518,7 +521,7 @@ public boolean conditionMet() {
             public boolean conditionMet() {
                 return new Long(8).equals(store.get("hello"));
             }
-        }, maxWaitMs, "wait for count to be 8");
+        }, MAX_WAIT_MS, "wait for count to be 8");
 
         // close stream
         kafkaStreams.close();
@@ -538,8 +541,7 @@ public boolean conditionMet() {
                     return false;
                 }
             }
-        }, maxWaitMs, "waiting for store " + storeName);
-
+        }, MAX_WAIT_MS, "waiting for store " + storeName);
     }
 
     private void verifyRangeAndAll(final Set<KeyValue<String, Long>> expectedCount,
@@ -582,7 +584,7 @@ private void verifyCanGetByKey(final String[] keys,
         final Set<KeyValue<String, Long>> windowState = new TreeSet<>(stringLongComparator);
         final Set<KeyValue<String, Long>> countState = new TreeSet<>(stringLongComparator);
 
-        final long timeout = System.currentTimeMillis() + 30000;
+        final long timeout = System.currentTimeMillis() + MAX_WAIT_MS;
         while ((windowState.size() < keys.length ||
             countState.size() < keys.length) &&
             System.currentTimeMillis() < timeout) {
@@ -670,7 +672,7 @@ private void waitUntilAtLeastNumRecordProcessed(final String topic, final int nu
             config,
             topic,
             numRecs,
-            60 * 1000);
+            MAX_WAIT_MS);
     }
 
     private Set<KeyValue<String, Long>> fetch(final ReadOnlyWindowStore<String, Long> store,
@@ -695,7 +697,6 @@ private void waitUntilAtLeastNumRecordProcessed(final String topic, final int nu
         return Collections.emptyMap();
     }
 
-
     /**
      * A class that periodically produces records in a separate thread
      */
@@ -745,6 +746,4 @@ public void run() {
             }
         }
     }
-
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> MetricReporter interface depends on final class KafkaMetric instead of Metric interface
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-3923
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3923
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Igor Stepanov
>
> Hello,
> I'm working on exposing Kafka's consumer/producer metrics to Spring Actuator.
> To achieve this, I've implemented Kafka's MetricReporter interface to allow injecting it into the appropriate consumer/producer. No issues with implementation itself, fine for me.
> But now I've moved to writing unit tests for this implementation and decided to use mocked KafkaMetric instances for this. But mocking of KafkaMetric itself is not so plain - the class is final. The logical step is to use Metric interface for mocking, but MetricReporter accepts only KafkaMetric.
> I know that technically I can use PowerMock and most probably it will work fine in this case, but talking about Kafka itself, is it a good approach to depend on the exact implementation when interface is available?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)