You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/12 17:38:17 UTC

kafka git commit: MINOR: Fix consumer and producer to actually support metrics recording level

Repository: kafka
Updated Branches:
  refs/heads/trunk 495184916 -> 1c2bbaa50


MINOR: Fix consumer and producer to actually support metrics recording level

Also add tests and a few clean-ups.

Author: Ismael Juma <is...@juma.me.uk>

Reviewers: Eno Thereska <en...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #2937 from ijuma/metrics-recording-level-producer


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c2bbaa5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c2bbaa5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c2bbaa5

Branch: refs/heads/trunk
Commit: 1c2bbaa501c2d0fd4db9c2dacacc3ff7f5236d3d
Parents: 4951849
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri May 12 10:36:44 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri May 12 10:36:44 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 10 +++--
 .../kafka/clients/producer/KafkaProducer.java   |  8 ++--
 .../kafka/clients/producer/ProducerConfig.java  | 18 +++++++-
 .../apache/kafka/common/network/Selector.java   | 20 +++++----
 .../clients/consumer/KafkaConsumerTest.java     | 15 +++++++
 .../clients/producer/KafkaProducerTest.java     | 46 +++++++++++++-------
 6 files changed, 84 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9df674d..aad4453 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.Selector;
 import org.apache.kafka.common.requests.IsolationLevel;
@@ -58,7 +59,6 @@ import java.util.Collections;
 import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Locale;
 import java.util.Map;
@@ -519,6 +519,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private static final String JMX_PREFIX = "kafka.consumer";
     static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
 
+    // Visible for testing
+    final Metrics metrics;
+
     private final String clientId;
     private final ConsumerCoordinator coordinator;
     private final Deserializer<K> keyDeserializer;
@@ -528,7 +531,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     private final Time time;
     private final ConsumerNetworkClient client;
-    private final Metrics metrics;
     private final SubscriptionState subscriptions;
     private final Metadata metadata;
     private final long retryBackoffMs;
@@ -622,10 +624,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
             if (clientId.length() <= 0)
                 clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
             this.clientId = clientId;
-            Map<String, String> metricsTags = new LinkedHashMap<>();
-            metricsTags.put("client-id", clientId);
+            Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricsTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f812389..b1f405a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -64,7 +64,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
 import java.util.Collections;
-import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -149,13 +148,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     private static final String JMX_PREFIX = "kafka.producer";
 
     private String clientId;
+    // Visible for testing
+    final Metrics metrics;
     private final Partitioner partitioner;
     private final int maxRequestSize;
     private final long totalMemorySize;
     private final Metadata metadata;
     private final RecordAccumulator accumulator;
     private final Sender sender;
-    private final Metrics metrics;
     private final Thread ioThread;
     private final CompressionType compressionType;
     private final Sensor errors;
@@ -230,10 +230,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
             clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
             if (clientId.length() <= 0)
                 clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
-            Map<String, String> metricTags = new LinkedHashMap<String, String>();
-            metricTags.put("client-id", clientId);
+            Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
                     .timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                    .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
                     .tags(metricTags);
             List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
                     MetricsReporter.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 12e8c64..4208a90 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 
 import java.util.HashMap;
@@ -145,6 +146,11 @@ public class ProducerConfig extends AbstractConfig {
     /** <code>metrics.num.samples</code> */
     public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
 
+    /**
+     * <code>metrics.log.level</code>
+     */
+    public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
     /** <code>metric.reporters</code> */
     public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
 
@@ -232,7 +238,6 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         MAX_REQUEST_SIZE_DOC)
                                 .define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
                                 .define(MAX_BLOCK_MS_CONFIG,
                                         Type.LONG,
@@ -254,6 +259,17 @@ public class ProducerConfig extends AbstractConfig {
                                         Importance.LOW,
                                         CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
                                 .define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                                .define(METRICS_RECORDING_LEVEL_CONFIG,
+                                        Type.STRING,
+                                        Sensor.RecordingLevel.INFO.toString(),
+                                        in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
+                                .define(METRIC_REPORTER_CLASSES_CONFIG,
+                                        Type.LIST,
+                                        "",
+                                        Importance.LOW,
+                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
                                 .define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
                                         Type.INT,
                                         5,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 312e1f5..a74a584 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -96,11 +97,8 @@ public class Selector implements Selectable, AutoCloseable {
     private final List<String> failedSends;
     private final Time time;
     private final SelectorMetrics sensors;
-    private final String metricGrpPrefix;
-    private final Map<String, String> metricTags;
     private final ChannelBuilder channelBuilder;
     private final int maxReceiveSize;
-    private final boolean metricsPerConnection;
     private final boolean recordTimePerConnection;
     private final IdleExpiryManager idleExpiryManager;
 
@@ -132,8 +130,6 @@ public class Selector implements Selectable, AutoCloseable {
         }
         this.maxReceiveSize = maxReceiveSize;
         this.time = time;
-        this.metricGrpPrefix = metricGrpPrefix;
-        this.metricTags = metricTags;
         this.channels = new HashMap<>();
         this.completedSends = new ArrayList<>();
         this.completedReceives = new ArrayList<>();
@@ -143,9 +139,8 @@ public class Selector implements Selectable, AutoCloseable {
         this.connected = new ArrayList<>();
         this.disconnected = new ArrayList<>();
         this.failedSends = new ArrayList<>();
-        this.sensors = new SelectorMetrics(metrics);
+        this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection);
         this.channelBuilder = channelBuilder;
-        this.metricsPerConnection = metricsPerConnection;
         this.recordTimePerConnection = recordTimePerConnection;
         this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
     }
@@ -162,7 +157,7 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
-        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
+        this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder);
     }
 
     /**
@@ -679,6 +674,10 @@ public class Selector implements Selectable, AutoCloseable {
 
     private class SelectorMetrics {
         private final Metrics metrics;
+        private final String metricGrpPrefix;
+        private final Map<String, String> metricTags;
+        private final boolean metricsPerConnection;
+
         public final Sensor connectionClosed;
         public final Sensor connectionCreated;
         public final Sensor bytesTransferred;
@@ -691,8 +690,11 @@ public class Selector implements Selectable, AutoCloseable {
         private final List<MetricName> topLevelMetricNames = new ArrayList<>();
         private final List<Sensor> sensors = new ArrayList<>();
 
-        public SelectorMetrics(Metrics metrics) {
+        public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
             this.metrics = metrics;
+            this.metricGrpPrefix = metricGrpPrefix;
+            this.metricTags = metricTags;
+            this.metricsPerConnection = metricsPerConnection;
             String metricGrpName = metricGrpPrefix + "-metrics";
             StringBuilder tagsSuffix = new StringBuilder();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a598b5d..5928a28 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
@@ -1234,6 +1235,20 @@ public class KafkaConsumerTest {
         consumer.close();
     }
 
+    @Test
+    public void testMetricConfigRecordingLevel() {
+        Properties props = new Properties();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+            assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel());
+        }
+
+        props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
+        try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+            assertEquals(Sensor.RecordingLevel.DEBUG, consumer.metrics.config().recordLevel());
+        }
+    }
+
     private void consumerCloseTest(final long closeTimeoutMs,
             List<? extends AbstractResponse> responses,
             long waitMs,

http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index ea493d2..3a6426a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.ExtendedSerializer;
@@ -53,6 +54,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -87,9 +89,9 @@ public class KafkaProducerTest {
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
                     props, new ByteArraySerializer(), new ByteArraySerializer());
         } catch (KafkaException e) {
-            Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
-            Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
-            Assert.assertEquals("Failed to construct kafka producer", e.getMessage());
+            assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
+            assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
+            assertEquals("Failed to construct kafka producer", e.getMessage());
             return;
         }
         fail("should have caught an exception and returned");
@@ -107,12 +109,12 @@ public class KafkaProducerTest {
 
         KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
                 configs, new MockSerializer(), new MockSerializer());
-        Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
-        Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
+        assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
+        assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
 
         producer.close();
-        Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
-        Assert.assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get());
+        assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
+        assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get());
     }
 
     @Test
@@ -126,15 +128,15 @@ public class KafkaProducerTest {
 
             KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
                     props, new StringSerializer(), new StringSerializer());
-            Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
-            Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
+            assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+            assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
 
             // Cluster metadata will only be updated on calling onSend.
             Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get());
 
             producer.close();
-            Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
-            Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
+            assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+            assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
         } finally {
             // cleanup since we are using mutable static variables in MockProducerInterceptor
             MockProducerInterceptor.resetCounters();
@@ -150,12 +152,12 @@ public class KafkaProducerTest {
 
             KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
                     props, new StringSerializer(), new StringSerializer());
-            Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get());
-            Assert.assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
+            assertEquals(1, MockPartitioner.INIT_COUNT.get());
+            assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
 
             producer.close();
-            Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get());
-            Assert.assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
+            assertEquals(1, MockPartitioner.INIT_COUNT.get());
+            assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
         } finally {
             // cleanup since we are using mutable static variables in MockPartitioner
             MockPartitioner.resetCounters();
@@ -418,4 +420,18 @@ public class KafkaProducerTest {
         producer.close();
     }
 
+    @Test
+    public void testMetricConfigRecordingLevel() {
+        Properties props = new Properties();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
+            assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel());
+        }
+
+        props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
+        try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
+            assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel());
+        }
+    }
+
 }