You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/27 15:37:17 UTC

kafka git commit: MINOR: Fix race condition in KafkaConsumer close

Repository: kafka
Updated Branches:
  refs/heads/trunk f1cc8008e -> 031da889b


MINOR: Fix race condition in KafkaConsumer close

We intended to make `KafkaConsumer.close()` idempotent,
but due to the fact that the `closed` variable is
checked without a lock prior to beginning close logic,
it is possible for two or more threads to see
`closed=false` and attempt to close.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>

Closes #3426 from hachikuji/minor-fix-consumer-idempotent-close


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/031da889
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/031da889
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/031da889

Branch: refs/heads/trunk
Commit: 031da889bc811200da67568c5779760dcb006238
Parents: f1cc800
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Jun 27 16:36:37 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 27 16:36:45 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 104 +++++++++++--------
 .../clients/consumer/KafkaConsumerTest.java     |   1 +
 .../kafka/api/ConsumerBounceTest.scala          |   8 +-
 3 files changed, 68 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/031da889/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 155f2e0..3154061 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -804,7 +804,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The set of partitions currently assigned to this consumer
      */
     public Set<TopicPartition> assignment() {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.assignedPartitions()));
         } finally {
@@ -818,7 +818,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The set of topics currently subscribed to
      */
     public Set<String> subscription() {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
         } finally {
@@ -857,7 +857,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             if (topics == null) {
                 throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
@@ -923,7 +923,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             if (pattern == null)
                 throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
@@ -942,7 +942,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * also clears any partitions directly assigned through {@link #assign(Collection)}.
      */
     public void unsubscribe() {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             log.debug("Unsubscribed all topics or patterns and assigned partitions");
             this.subscriptions.unsubscribe();
@@ -970,7 +970,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void assign(Collection<TopicPartition> partitions) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             if (partitions == null) {
                 throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
@@ -1028,7 +1028,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public ConsumerRecords<K, V> poll(long timeout) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             if (timeout < 0)
                 throw new IllegalArgumentException("Timeout must not be negative");
@@ -1134,7 +1134,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync() {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             coordinator.commitOffsetsSync(subscriptions.allConsumed(), Long.MAX_VALUE);
         } finally {
@@ -1168,7 +1168,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE);
         } finally {
@@ -1199,7 +1199,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitAsync(OffsetCommitCallback callback) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             commitAsync(subscriptions.allConsumed(), callback);
         } finally {
@@ -1224,7 +1224,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             log.debug("Committing offsets: {} ", offsets);
             coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
@@ -1240,11 +1240,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void seek(TopicPartition partition, long offset) {
-        if (offset < 0) {
-            throw new IllegalArgumentException("seek offset must not be a negative number");
-        }
-        acquire();
+        acquireAndEnsureOpen();
         try {
+            if (offset < 0)
+                throw new IllegalArgumentException("seek offset must not be a negative number");
+
             log.debug("Seeking to offset {} for partition {}", offset, partition);
             this.subscriptions.seek(partition, offset);
         } finally {
@@ -1258,7 +1258,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * If no partition is provided, seek to the first offset for all of the currently assigned partitions.
      */
     public void seekToBeginning(Collection<TopicPartition> partitions) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
@@ -1279,7 +1279,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * of the first message with an open transaction.
      */
     public void seekToEnd(Collection<TopicPartition> partitions) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
             for (TopicPartition tp : parts) {
@@ -1307,7 +1307,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
      */
     public long position(TopicPartition partition) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             if (!this.subscriptions.isAssigned(partition))
                 throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
@@ -1341,7 +1341,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public OffsetAndMetadata committed(TopicPartition partition) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
             return offsets.get(partition);
@@ -1375,7 +1375,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public List<PartitionInfo> partitionsFor(String topic) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             Cluster cluster = this.metadata.fetch();
             List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
@@ -1405,7 +1405,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<String, List<PartitionInfo>> listTopics() {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             return fetcher.getAllTopicMetadata(requestTimeoutMs);
         } finally {
@@ -1422,7 +1422,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void pause(Collection<TopicPartition> partitions) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             for (TopicPartition partition: partitions) {
                 log.debug("Pausing partition {}", partition);
@@ -1441,7 +1441,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public void resume(Collection<TopicPartition> partitions) {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             for (TopicPartition partition: partitions) {
                 log.debug("Resuming partition {}", partition);
@@ -1459,7 +1459,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Set<TopicPartition> paused() {
-        acquire();
+        acquireAndEnsureOpen();
         try {
             return Collections.unmodifiableSet(subscriptions.pausedPartitions());
         } finally {
@@ -1487,14 +1487,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
-        for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
-            // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
-            // OffsetAndTimestamp is always positive.
-            if (entry.getValue() < 0)
-                throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
-                        entry.getValue() + ". The target time cannot be negative.");
+        acquireAndEnsureOpen();
+        try {
+            for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
+                // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
+                // OffsetAndTimestamp is always positive.
+                if (entry.getValue() < 0)
+                    throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
+                            entry.getValue() + ". The target time cannot be negative.");
+            }
+            return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
+        } finally {
+            release();
         }
-        return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
     }
 
     /**
@@ -1510,7 +1515,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
-        return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+        acquireAndEnsureOpen();
+        try {
+            return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+        } finally {
+            release();
+        }
     }
 
     /**
@@ -1532,7 +1542,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      */
     @Override
     public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
-        return fetcher.endOffsets(partitions, requestTimeoutMs);
+        acquireAndEnsureOpen();
+        try {
+            return fetcher.endOffsets(partitions, requestTimeoutMs);
+        } finally {
+            release();
+        }
+
     }
 
     /**
@@ -1564,13 +1580,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @throws IllegalArgumentException If the <code>timeout</code> is negative.
      */
     public void close(long timeout, TimeUnit timeUnit) {
-        if (closed)
-            return;
         if (timeout < 0)
             throw new IllegalArgumentException("The timeout cannot be negative.");
         acquire();
         try {
-            close(timeUnit.toMillis(timeout), false);
+            if (!closed) {
+                closed = true;
+                close(timeUnit.toMillis(timeout), false);
+            }
         } finally {
             release();
         }
@@ -1599,7 +1616,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     private void close(long timeoutMs, boolean swallowException) {
         log.trace("Closing the Kafka consumer.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
-        this.closed = true;
         try {
             if (coordinator != null)
                 coordinator.close(Math.min(timeoutMs, requestTimeoutMs));
@@ -1651,23 +1667,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         }
     }
 
-    /*
-     * Check that the consumer hasn't been closed.
+    /**
+     * Acquire the light lock and ensure that the consumer hasn't been closed.
+     * @throws IllegalStateException If the consumer has been closed
      */
-    private void ensureNotClosed() {
-        if (this.closed)
+    private void acquireAndEnsureOpen() {
+        acquire();
+        if (this.closed) {
+            release();
             throw new IllegalStateException("This consumer has already been closed.");
+        }
     }
 
     /**
      * Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
      * when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
      * supported).
-     * @throws IllegalStateException if the consumer has been closed
      * @throws ConcurrentModificationException if another thread already has the lock
      */
     private void acquire() {
-        ensureNotClosed();
         long threadId = Thread.currentThread().getId();
         if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
             throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");

http://git-wip-us.apache.org/repos/asf/kafka/blob/031da889/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 45fccb7..219c3f6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1248,6 +1248,7 @@ public class KafkaConsumerTest {
         KafkaConsumer<byte[], byte[]> consumer = newConsumer();
         consumer.close();
         consumer.close();
+        consumer.close();
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/031da889/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index dc51d67..a06cc29 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -333,7 +333,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     val rebalanceFuture = createConsumerToRebalance()
 
     // consumer1 should leave group and close immediately even though rebalance is in progress
-    submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+    val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
 
     // Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group
     waitForRebalance(2000, rebalanceFuture, consumer2)
@@ -343,7 +343,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
     servers.foreach(server => killBroker(server.config.brokerId))
 
     // consumer2 should close immediately without LeaveGroup request since there are no brokers available
-    submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))
+    val closeFuture2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))
+
+    // Ensure futures complete to avoid concurrent shutdown attempt during test cleanup
+    closeFuture1.get(2000, TimeUnit.MILLISECONDS)
+    closeFuture2.get(2000, TimeUnit.MILLISECONDS)
   }
 
   private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = {