You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/12 17:38:17 UTC
kafka git commit: MINOR: Fix consumer and producer to actually
support metrics recording level
Repository: kafka
Updated Branches:
refs/heads/trunk 495184916 -> 1c2bbaa50
MINOR: Fix consumer and producer to actually support metrics recording level
Also add tests and a few clean-ups.
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Eno Thereska <en...@gmail.com>, Jason Gustafson <ja...@confluent.io>
Closes #2937 from ijuma/metrics-recording-level-producer
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1c2bbaa5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1c2bbaa5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1c2bbaa5
Branch: refs/heads/trunk
Commit: 1c2bbaa501c2d0fd4db9c2dacacc3ff7f5236d3d
Parents: 4951849
Author: Ismael Juma <is...@juma.me.uk>
Authored: Fri May 12 10:36:44 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri May 12 10:36:44 2017 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 10 +++--
.../kafka/clients/producer/KafkaProducer.java | 8 ++--
.../kafka/clients/producer/ProducerConfig.java | 18 +++++++-
.../apache/kafka/common/network/Selector.java | 20 +++++----
.../clients/consumer/KafkaConsumerTest.java | 15 +++++++
.../clients/producer/KafkaProducerTest.java | 46 +++++++++++++-------
6 files changed, 84 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9df674d..aad4453 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.ChannelBuilder;
import org.apache.kafka.common.network.Selector;
import org.apache.kafka.common.requests.IsolationLevel;
@@ -58,7 +59,6 @@ import java.util.Collections;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Locale;
import java.util.Map;
@@ -519,6 +519,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private static final String JMX_PREFIX = "kafka.consumer";
static final long DEFAULT_CLOSE_TIMEOUT_MS = 30 * 1000;
+ // Visible for testing
+ final Metrics metrics;
+
private final String clientId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
@@ -528,7 +531,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private final Time time;
private final ConsumerNetworkClient client;
- private final Metrics metrics;
private final SubscriptionState subscriptions;
private final Metadata metadata;
private final long retryBackoffMs;
@@ -622,10 +624,10 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
if (clientId.length() <= 0)
clientId = "consumer-" + CONSUMER_CLIENT_ID_SEQUENCE.getAndIncrement();
this.clientId = clientId;
- Map<String, String> metricsTags = new LinkedHashMap<>();
- metricsTags.put("client-id", clientId);
+ Map<String, String> metricsTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ConsumerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+ .recordLevel(Sensor.RecordingLevel.forName(config.getString(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricsTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ConsumerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index f812389..b1f405a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -64,7 +64,6 @@ import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
import java.util.Collections;
-import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -149,13 +148,14 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
private static final String JMX_PREFIX = "kafka.producer";
private String clientId;
+ // Visible for testing
+ final Metrics metrics;
private final Partitioner partitioner;
private final int maxRequestSize;
private final long totalMemorySize;
private final Metadata metadata;
private final RecordAccumulator accumulator;
private final Sender sender;
- private final Metrics metrics;
private final Thread ioThread;
private final CompressionType compressionType;
private final Sensor errors;
@@ -230,10 +230,10 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
clientId = config.getString(ProducerConfig.CLIENT_ID_CONFIG);
if (clientId.length() <= 0)
clientId = "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
- Map<String, String> metricTags = new LinkedHashMap<String, String>();
- metricTags.put("client-id", clientId);
+ Map<String, String> metricTags = Collections.singletonMap("client-id", clientId);
MetricConfig metricConfig = new MetricConfig().samples(config.getInt(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG))
.timeWindow(config.getLong(ProducerConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+ .recordLevel(Sensor.RecordingLevel.forName(config.getString(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG)))
.tags(metricTags);
List<MetricsReporter> reporters = config.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
MetricsReporter.class);
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 12e8c64..4208a90 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import java.util.HashMap;
@@ -145,6 +146,11 @@ public class ProducerConfig extends AbstractConfig {
/** <code>metrics.num.samples</code> */
public static final String METRICS_NUM_SAMPLES_CONFIG = CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG;
+ /**
+ * <code>metrics.log.level</code>
+ */
+ public static final String METRICS_RECORDING_LEVEL_CONFIG = CommonClientConfigs.METRICS_RECORDING_LEVEL_CONFIG;
+
/** <code>metric.reporters</code> */
public static final String METRIC_REPORTER_CLASSES_CONFIG = CommonClientConfigs.METRIC_REPORTER_CLASSES_CONFIG;
@@ -232,7 +238,6 @@ public class ProducerConfig extends AbstractConfig {
Importance.MEDIUM,
MAX_REQUEST_SIZE_DOC)
.define(RECONNECT_BACKOFF_MS_CONFIG, Type.LONG, 50L, atLeast(0L), Importance.LOW, CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC)
- .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST, "", Importance.LOW, CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(RETRY_BACKOFF_MS_CONFIG, Type.LONG, 100L, atLeast(0L), Importance.LOW, CommonClientConfigs.RETRY_BACKOFF_MS_DOC)
.define(MAX_BLOCK_MS_CONFIG,
Type.LONG,
@@ -254,6 +259,17 @@ public class ProducerConfig extends AbstractConfig {
Importance.LOW,
CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
.define(METRICS_NUM_SAMPLES_CONFIG, Type.INT, 2, atLeast(1), Importance.LOW, CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+ .define(METRICS_RECORDING_LEVEL_CONFIG,
+ Type.STRING,
+ Sensor.RecordingLevel.INFO.toString(),
+ in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()),
+ Importance.LOW,
+ CommonClientConfigs.METRICS_RECORDING_LEVEL_DOC)
+ .define(METRIC_REPORTER_CLASSES_CONFIG,
+ Type.LIST,
+ "",
+ Importance.LOW,
+ CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
.define(MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,
Type.INT,
5,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index 312e1f5..a74a584 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;
import java.util.ArrayDeque;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
@@ -96,11 +97,8 @@ public class Selector implements Selectable, AutoCloseable {
private final List<String> failedSends;
private final Time time;
private final SelectorMetrics sensors;
- private final String metricGrpPrefix;
- private final Map<String, String> metricTags;
private final ChannelBuilder channelBuilder;
private final int maxReceiveSize;
- private final boolean metricsPerConnection;
private final boolean recordTimePerConnection;
private final IdleExpiryManager idleExpiryManager;
@@ -132,8 +130,6 @@ public class Selector implements Selectable, AutoCloseable {
}
this.maxReceiveSize = maxReceiveSize;
this.time = time;
- this.metricGrpPrefix = metricGrpPrefix;
- this.metricTags = metricTags;
this.channels = new HashMap<>();
this.completedSends = new ArrayList<>();
this.completedReceives = new ArrayList<>();
@@ -143,9 +139,8 @@ public class Selector implements Selectable, AutoCloseable {
this.connected = new ArrayList<>();
this.disconnected = new ArrayList<>();
this.failedSends = new ArrayList<>();
- this.sensors = new SelectorMetrics(metrics);
+ this.sensors = new SelectorMetrics(metrics, metricGrpPrefix, metricTags, metricsPerConnection);
this.channelBuilder = channelBuilder;
- this.metricsPerConnection = metricsPerConnection;
this.recordTimePerConnection = recordTimePerConnection;
this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
}
@@ -162,7 +157,7 @@ public class Selector implements Selectable, AutoCloseable {
}
public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
- this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, new HashMap<String, String>(), true, channelBuilder);
+ this(NetworkReceive.UNLIMITED, connectionMaxIdleMS, metrics, time, metricGrpPrefix, Collections.<String, String>emptyMap(), true, channelBuilder);
}
/**
@@ -679,6 +674,10 @@ public class Selector implements Selectable, AutoCloseable {
private class SelectorMetrics {
private final Metrics metrics;
+ private final String metricGrpPrefix;
+ private final Map<String, String> metricTags;
+ private final boolean metricsPerConnection;
+
public final Sensor connectionClosed;
public final Sensor connectionCreated;
public final Sensor bytesTransferred;
@@ -691,8 +690,11 @@ public class Selector implements Selectable, AutoCloseable {
private final List<MetricName> topLevelMetricNames = new ArrayList<>();
private final List<Sensor> sensors = new ArrayList<>();
- public SelectorMetrics(Metrics metrics) {
+ public SelectorMetrics(Metrics metrics, String metricGrpPrefix, Map<String, String> metricTags, boolean metricsPerConnection) {
this.metrics = metrics;
+ this.metricGrpPrefix = metricGrpPrefix;
+ this.metricTags = metricTags;
+ this.metricsPerConnection = metricsPerConnection;
String metricGrpName = metricGrpPrefix + "-metrics";
StringBuilder tagsSuffix = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index a598b5d..5928a28 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -35,6 +35,7 @@ import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
@@ -1234,6 +1235,20 @@ public class KafkaConsumerTest {
consumer.close();
}
+ @Test
+ public void testMetricConfigRecordingLevel() {
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+ assertEquals(Sensor.RecordingLevel.INFO, consumer.metrics.config().recordLevel());
+ }
+
+ props.put(ConsumerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
+ try (KafkaConsumer consumer = new KafkaConsumer<>(props, new ByteArrayDeserializer(), new ByteArrayDeserializer())) {
+ assertEquals(Sensor.RecordingLevel.DEBUG, consumer.metrics.config().recordLevel());
+ }
+ }
+
private void consumerCloseTest(final long closeTimeoutMs,
List<? extends AbstractResponse> responses,
long waitMs,
http://git-wip-us.apache.org/repos/asf/kafka/blob/1c2bbaa5/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index ea493d2..3a6426a 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.ExtendedSerializer;
@@ -53,6 +54,7 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -87,9 +89,9 @@ public class KafkaProducerTest {
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
props, new ByteArraySerializer(), new ByteArraySerializer());
} catch (KafkaException e) {
- Assert.assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
- Assert.assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
- Assert.assertEquals("Failed to construct kafka producer", e.getMessage());
+ assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
+ assertEquals(oldCloseCount + 1, MockMetricsReporter.CLOSE_COUNT.get());
+ assertEquals("Failed to construct kafka producer", e.getMessage());
return;
}
fail("should have caught an exception and returned");
@@ -107,12 +109,12 @@ public class KafkaProducerTest {
KafkaProducer<byte[], byte[]> producer = new KafkaProducer<byte[], byte[]>(
configs, new MockSerializer(), new MockSerializer());
- Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
- Assert.assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
+ assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
+ assertEquals(oldCloseCount, MockSerializer.CLOSE_COUNT.get());
producer.close();
- Assert.assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
- Assert.assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get());
+ assertEquals(oldInitCount + 2, MockSerializer.INIT_COUNT.get());
+ assertEquals(oldCloseCount + 2, MockSerializer.CLOSE_COUNT.get());
}
@Test
@@ -126,15 +128,15 @@ public class KafkaProducerTest {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
props, new StringSerializer(), new StringSerializer());
- Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
- Assert.assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
+ assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+ assertEquals(0, MockProducerInterceptor.CLOSE_COUNT.get());
// Cluster metadata will only be updated on calling onSend.
Assert.assertNull(MockProducerInterceptor.CLUSTER_META.get());
producer.close();
- Assert.assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
- Assert.assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
+ assertEquals(1, MockProducerInterceptor.INIT_COUNT.get());
+ assertEquals(1, MockProducerInterceptor.CLOSE_COUNT.get());
} finally {
// cleanup since we are using mutable static variables in MockProducerInterceptor
MockProducerInterceptor.resetCounters();
@@ -150,12 +152,12 @@ public class KafkaProducerTest {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
props, new StringSerializer(), new StringSerializer());
- Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get());
- Assert.assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
+ assertEquals(1, MockPartitioner.INIT_COUNT.get());
+ assertEquals(0, MockPartitioner.CLOSE_COUNT.get());
producer.close();
- Assert.assertEquals(1, MockPartitioner.INIT_COUNT.get());
- Assert.assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
+ assertEquals(1, MockPartitioner.INIT_COUNT.get());
+ assertEquals(1, MockPartitioner.CLOSE_COUNT.get());
} finally {
// cleanup since we are using mutable static variables in MockPartitioner
MockPartitioner.resetCounters();
@@ -418,4 +420,18 @@ public class KafkaProducerTest {
producer.close();
}
+ @Test
+ public void testMetricConfigRecordingLevel() {
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
+ assertEquals(Sensor.RecordingLevel.INFO, producer.metrics.config().recordLevel());
+ }
+
+ props.put(ProducerConfig.METRICS_RECORDING_LEVEL_CONFIG, "DEBUG");
+ try (KafkaProducer producer = new KafkaProducer<>(props, new ByteArraySerializer(), new ByteArraySerializer())) {
+ assertEquals(Sensor.RecordingLevel.DEBUG, producer.metrics.config().recordLevel());
+ }
+ }
+
}