You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/11/30 00:46:01 UTC

kafka git commit: KAFKA-5936; KafkaProducer.close should throw InterruptException

Repository: kafka
Updated Branches:
  refs/heads/trunk b512cd474 -> 38c5d7fba


KAFKA-5936; KafkaProducer.close should throw InterruptException

Author: Matthias J. Sax <ma...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Guozhang Wang <wa...@gmail.com>, Jason Gustafson <ja...@confluent.io>

Closes #3912 from mjsax/kafka-5936-producer-close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/38c5d7fb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/38c5d7fb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/38c5d7fb

Branch: refs/heads/trunk
Commit: 38c5d7fba7387b797a10c9c6ed71bf99c6d417bc
Parents: b512cd4
Author: Matthias J. Sax <ma...@confluent.io>
Authored: Wed Nov 29 16:22:07 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed Nov 29 16:22:07 2017 -0800

----------------------------------------------------------------------
 .../org/apache/kafka/clients/ClientUtils.java   | 18 ++---
 .../kafka/clients/producer/KafkaProducer.java   | 41 +++++++----
 .../clients/consumer/KafkaConsumerTest.java     | 23 +++---
 .../clients/producer/KafkaProducerTest.java     | 76 +++++++++++++++++++-
 4 files changed, 123 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index ea4c4db..2a9b763 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -16,22 +16,22 @@
  */
 package org.apache.kafka.clients;
 
-import java.io.Closeable;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicReference;
-
 import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.ChannelBuilder;
 import org.apache.kafka.common.network.ChannelBuilders;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
-import org.apache.kafka.common.network.ChannelBuilder;
-import org.apache.kafka.common.config.ConfigException;
-import org.apache.kafka.common.config.SaslConfigs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.Closeable;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index b3cff19..1fec744 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -265,7 +266,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *
      */
     public KafkaProducer(Map<String, Object> configs) {
-        this(new ProducerConfig(configs), null, null);
+        this(new ProducerConfig(configs), null, null, null, null);
     }
 
     /**
@@ -281,7 +282,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Map<String, Object> configs, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(configs, keySerializer, valueSerializer)),
-                keySerializer, valueSerializer);
+                keySerializer, valueSerializer, null, null);
     }
 
     /**
@@ -290,7 +291,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param properties   The producer configs
      */
     public KafkaProducer(Properties properties) {
-        this(new ProducerConfig(properties), null, null);
+        this(new ProducerConfig(properties), null, null, null, null);
     }
 
     /**
@@ -304,11 +305,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      */
     public KafkaProducer(Properties properties, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
         this(new ProducerConfig(ProducerConfig.addSerializerToConfig(properties, keySerializer, valueSerializer)),
-                keySerializer, valueSerializer);
+                keySerializer, valueSerializer, null, null);
     }
 
     @SuppressWarnings("unchecked")
-    private KafkaProducer(ProducerConfig config, Serializer<K> keySerializer, Serializer<V> valueSerializer) {
+    // visible for testing
+    KafkaProducer(ProducerConfig config,
+                  Serializer<K> keySerializer,
+                  Serializer<V> valueSerializer,
+                  Metadata metadata,
+                  KafkaClient kafkaClient) {
         try {
             Map<String, Object> userProvidedConfigs = config.originals();
             this.producerConfig = config;
@@ -363,8 +369,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     ProducerInterceptor.class);
             this.interceptors = interceptorList.isEmpty() ? null : new ProducerInterceptors<>(interceptorList);
             ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keySerializer, valueSerializer, interceptorList, reporters);
-            this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
-                    true, true, clusterResourceListeners);
             this.maxRequestSize = config.getInt(ProducerConfig.MAX_REQUEST_SIZE_CONFIG);
             this.totalMemorySize = config.getLong(ProducerConfig.BUFFER_MEMORY_CONFIG);
             this.compressionType = CompressionType.forName(config.getString(ProducerConfig.COMPRESSION_TYPE_CONFIG));
@@ -388,10 +392,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     apiVersions,
                     transactionManager);
             List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
+            if (metadata != null) {
+                this.metadata = metadata;
+            } else {
+                this.metadata = new Metadata(retryBackoffMs, config.getLong(ProducerConfig.METADATA_MAX_AGE_CONFIG),
+                    true, true, clusterResourceListeners);
+                this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), time.milliseconds());
+            }
             ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config);
             Sensor throttleTimeSensor = Sender.throttleTimeSensor(metricsRegistry.senderMetrics);
-            NetworkClient client = new NetworkClient(
+            KafkaClient client = kafkaClient != null ? kafkaClient : new NetworkClient(
                     new Selector(config.getLong(ProducerConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG),
                             this.metrics, time, "producer", channelBuilder, logContext),
                     this.metadata,
@@ -1051,7 +1061,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                     try {
                         this.ioThread.join(timeUnit.toMillis(timeout));
                     } catch (InterruptedException t) {
-                        firstException.compareAndSet(null, t);
+                        firstException.compareAndSet(null, new InterruptException(t));
                         log.error("Interrupted while joining ioThread", t);
                     }
                 }
@@ -1067,7 +1077,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
                 try {
                     this.ioThread.join();
                 } catch (InterruptedException e) {
-                    firstException.compareAndSet(null, e);
+                    firstException.compareAndSet(null, new InterruptException(e));
                 }
             }
         }
@@ -1079,8 +1089,13 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
         ClientUtils.closeQuietly(partitioner, "producer partitioner", firstException);
         AppInfoParser.unregisterAppInfo(JMX_PREFIX, clientId, metrics);
         log.debug("Kafka producer has been closed");
-        if (firstException.get() != null && !swallowException)
-            throw new KafkaException("Failed to close kafka producer", firstException.get());
+        Throwable exception = firstException.get();
+        if (exception != null && !swallowException) {
+            if (exception instanceof InterruptException) {
+                throw (InterruptException) exception;
+            }
+            throw new KafkaException("Failed to close kafka producer", exception);
+        }
     }
 
     private ClusterResourceListeners configureClusterResourceListeners(Serializer<K> keySerializer, Serializer<V> valueSerializer, List<?>... candidateLists) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 12254c9..ab682d6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -68,6 +68,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.MockConsumerInterceptor;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -1365,7 +1366,7 @@ public class KafkaConsumerTest {
         // Kafka consumer is single-threaded, but the implementation allows calls on a
         // different thread as long as the calls are not executed concurrently. So this is safe.
         ExecutorService executor = Executors.newSingleThreadExecutor();
-        final AtomicReference<Exception> closeException = new AtomicReference<Exception>();
+        final AtomicReference<Exception> closeException = new AtomicReference<>();
         try {
             Future<?> future = executor.submit(new Runnable() {
                 @Override
@@ -1409,21 +1410,21 @@ public class KafkaConsumerTest {
 
             if (waitMs > 0)
                 time.sleep(waitMs);
-            if (interrupt)
+            if (interrupt) {
                 assertTrue("Close terminated prematurely", future.cancel(true));
 
-            // Make sure that close task completes and another task can be run on the single threaded executor
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                }
-            }).get(500, TimeUnit.MILLISECONDS);
+                TestUtils.waitForCondition(new TestCondition() {
+                    @Override
+                    public boolean conditionMet() {
+                        return closeException.get() != null;
+                    }
+                }, "InterruptException did not occur within timeout.");
 
-            if (!interrupt) {
+                assertTrue("Expected exception not thrown " + closeException, closeException.get() instanceof InterruptException);
+            } else {
                 future.get(500, TimeUnit.MILLISECONDS); // Should succeed without TimeoutException or ExecutionException
                 assertNull("Unexpected exception during close", closeException.get());
-            } else
-                assertTrue("Expected exception not thrown " + closeException, closeException.get() instanceof InterruptException);
+            }
         } finally {
             executor.shutdownNow();
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/38c5d7fb/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 26f7588..9f70fd7 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.producer.internals.ProducerInterceptors;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
@@ -25,9 +26,10 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -36,9 +38,11 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.MockPartitioner;
 import org.apache.kafka.test.MockProducerInterceptor;
 import org.apache.kafka.test.MockSerializer;
-import org.apache.kafka.test.MockPartitioner;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Assert;
 import org.junit.Test;
@@ -55,6 +59,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -147,6 +156,7 @@ public class KafkaProducerTest {
         try {
             Properties props = new Properties();
             props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+            MockPartitioner.resetCounters();
             props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
 
             KafkaProducer<String, String> producer = new KafkaProducer<String, String>(
@@ -164,6 +174,68 @@ public class KafkaProducerTest {
     }
 
     @Test
+    public void shouldCloseProperlyAndThrowIfInterrupted() throws Exception {
+        Properties props = new Properties();
+        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");
+        props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, MockPartitioner.class.getName());
+        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "1");
+
+        Time time = new MockTime();
+        Cluster cluster = TestUtils.singletonCluster("topic", 1);
+        Node node = cluster.nodes().get(0);
+
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE, true);
+        metadata.update(cluster, Collections.<String>emptySet(), time.milliseconds());
+
+        MockClient client = new MockClient(time, metadata);
+        client.setNode(node);
+
+        final Producer<String, String> producer = new KafkaProducer<>(
+            new ProducerConfig(ProducerConfig.addSerializerToConfig(props, new StringSerializer(), new StringSerializer())),
+            new StringSerializer(), new StringSerializer(), metadata, client);
+
+        ExecutorService executor = Executors.newSingleThreadExecutor();
+        final AtomicReference<Exception> closeException = new AtomicReference<>();
+        try {
+            Future<?> future = executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    producer.send(new ProducerRecord<>("topic", "key", "value"));
+                    try {
+                        producer.close();
+                        fail("Close should block and throw.");
+                    } catch (Exception e) {
+                        closeException.set(e);
+                    }
+                }
+            });
+
+            // Close producer should not complete until send succeeds
+            try {
+                future.get(100, TimeUnit.MILLISECONDS);
+                fail("Close completed without waiting for send");
+            } catch (java.util.concurrent.TimeoutException expected) { /* ignore */ }
+
+            // Ensure send has started
+            client.waitForRequests(1, 1000);
+
+            assertTrue("Close terminated prematurely", future.cancel(true));
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return closeException.get() != null;
+                }
+            }, "InterruptException did not occur within timeout.");
+
+            assertTrue("Expected exception not thrown " + closeException, closeException.get() instanceof InterruptException);
+        } finally {
+            executor.shutdownNow();
+        }
+
+    }
+
+    @Test
     public void testOsDefaultSocketBufferSizes() throws Exception {
         Map<String, Object> config = new HashMap<>();
         config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9999");