You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/27 15:37:17 UTC
kafka git commit: MINOR: Fix race condition in KafkaConsumer close
Repository: kafka
Updated Branches:
refs/heads/trunk f1cc8008e -> 031da889b
MINOR: Fix race condition in KafkaConsumer close
We intended to make `KafkaConsumer.close()` idempotent,
but due to the fact that the `closed` variable is
checked without a lock prior to beginning close logic,
it is possible for two or more threads to see
`closed=false` and attempt to close.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Apurva Mehta <ap...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3426 from hachikuji/minor-fix-consumer-idempotent-close
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/031da889
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/031da889
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/031da889
Branch: refs/heads/trunk
Commit: 031da889bc811200da67568c5779760dcb006238
Parents: f1cc800
Author: Jason Gustafson <ja...@confluent.io>
Authored: Tue Jun 27 16:36:37 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Tue Jun 27 16:36:45 2017 +0100
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 104 +++++++++++--------
.../clients/consumer/KafkaConsumerTest.java | 1 +
.../kafka/api/ConsumerBounceTest.scala | 8 +-
3 files changed, 68 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/031da889/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 155f2e0..3154061 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -804,7 +804,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The set of partitions currently assigned to this consumer
*/
public Set<TopicPartition> assignment() {
- acquire();
+ acquireAndEnsureOpen();
try {
return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.assignedPartitions()));
} finally {
@@ -818,7 +818,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @return The set of topics currently subscribed to
*/
public Set<String> subscription() {
- acquire();
+ acquireAndEnsureOpen();
try {
return Collections.unmodifiableSet(new HashSet<>(this.subscriptions.subscription()));
} finally {
@@ -857,7 +857,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Collection<String> topics, ConsumerRebalanceListener listener) {
- acquire();
+ acquireAndEnsureOpen();
try {
if (topics == null) {
throw new IllegalArgumentException("Topic collection to subscribe to cannot be null");
@@ -923,7 +923,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) {
- acquire();
+ acquireAndEnsureOpen();
try {
if (pattern == null)
throw new IllegalArgumentException("Topic pattern to subscribe to cannot be null");
@@ -942,7 +942,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* also clears any partitions directly assigned through {@link #assign(Collection)}.
*/
public void unsubscribe() {
- acquire();
+ acquireAndEnsureOpen();
try {
log.debug("Unsubscribed all topics or patterns and assigned partitions");
this.subscriptions.unsubscribe();
@@ -970,7 +970,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void assign(Collection<TopicPartition> partitions) {
- acquire();
+ acquireAndEnsureOpen();
try {
if (partitions == null) {
throw new IllegalArgumentException("Topic partition collection to assign to cannot be null");
@@ -1028,7 +1028,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public ConsumerRecords<K, V> poll(long timeout) {
- acquire();
+ acquireAndEnsureOpen();
try {
if (timeout < 0)
throw new IllegalArgumentException("Timeout must not be negative");
@@ -1134,7 +1134,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitSync() {
- acquire();
+ acquireAndEnsureOpen();
try {
coordinator.commitOffsetsSync(subscriptions.allConsumed(), Long.MAX_VALUE);
} finally {
@@ -1168,7 +1168,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets) {
- acquire();
+ acquireAndEnsureOpen();
try {
coordinator.commitOffsetsSync(new HashMap<>(offsets), Long.MAX_VALUE);
} finally {
@@ -1199,7 +1199,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitAsync(OffsetCommitCallback callback) {
- acquire();
+ acquireAndEnsureOpen();
try {
commitAsync(subscriptions.allConsumed(), callback);
} finally {
@@ -1224,7 +1224,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void commitAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
- acquire();
+ acquireAndEnsureOpen();
try {
log.debug("Committing offsets: {} ", offsets);
coordinator.commitOffsetsAsync(new HashMap<>(offsets), callback);
@@ -1240,11 +1240,11 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void seek(TopicPartition partition, long offset) {
- if (offset < 0) {
- throw new IllegalArgumentException("seek offset must not be a negative number");
- }
- acquire();
+ acquireAndEnsureOpen();
try {
+ if (offset < 0)
+ throw new IllegalArgumentException("seek offset must not be a negative number");
+
log.debug("Seeking to offset {} for partition {}", offset, partition);
this.subscriptions.seek(partition, offset);
} finally {
@@ -1258,7 +1258,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* If no partition is provided, seek to the first offset for all of the currently assigned partitions.
*/
public void seekToBeginning(Collection<TopicPartition> partitions) {
- acquire();
+ acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
@@ -1279,7 +1279,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* of the first message with an open transaction.
*/
public void seekToEnd(Collection<TopicPartition> partitions) {
- acquire();
+ acquireAndEnsureOpen();
try {
Collection<TopicPartition> parts = partitions.size() == 0 ? this.subscriptions.assignedPartitions() : partitions;
for (TopicPartition tp : parts) {
@@ -1307,7 +1307,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws org.apache.kafka.common.KafkaException for any other unrecoverable errors
*/
public long position(TopicPartition partition) {
- acquire();
+ acquireAndEnsureOpen();
try {
if (!this.subscriptions.isAssigned(partition))
throw new IllegalArgumentException("You can only check the position for partitions assigned to this consumer.");
@@ -1341,7 +1341,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public OffsetAndMetadata committed(TopicPartition partition) {
- acquire();
+ acquireAndEnsureOpen();
try {
Map<TopicPartition, OffsetAndMetadata> offsets = coordinator.fetchCommittedOffsets(Collections.singleton(partition));
return offsets.get(partition);
@@ -1375,7 +1375,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public List<PartitionInfo> partitionsFor(String topic) {
- acquire();
+ acquireAndEnsureOpen();
try {
Cluster cluster = this.metadata.fetch();
List<PartitionInfo> parts = cluster.partitionsForTopic(topic);
@@ -1405,7 +1405,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<String, List<PartitionInfo>> listTopics() {
- acquire();
+ acquireAndEnsureOpen();
try {
return fetcher.getAllTopicMetadata(requestTimeoutMs);
} finally {
@@ -1422,7 +1422,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void pause(Collection<TopicPartition> partitions) {
- acquire();
+ acquireAndEnsureOpen();
try {
for (TopicPartition partition: partitions) {
log.debug("Pausing partition {}", partition);
@@ -1441,7 +1441,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public void resume(Collection<TopicPartition> partitions) {
- acquire();
+ acquireAndEnsureOpen();
try {
for (TopicPartition partition: partitions) {
log.debug("Resuming partition {}", partition);
@@ -1459,7 +1459,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Set<TopicPartition> paused() {
- acquire();
+ acquireAndEnsureOpen();
try {
return Collections.unmodifiableSet(subscriptions.pausedPartitions());
} finally {
@@ -1487,14 +1487,19 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch) {
- for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
- // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
- // OffsetAndTimestamp is always positive.
- if (entry.getValue() < 0)
- throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
- entry.getValue() + ". The target time cannot be negative.");
+ acquireAndEnsureOpen();
+ try {
+ for (Map.Entry<TopicPartition, Long> entry : timestampsToSearch.entrySet()) {
+ // we explicitly exclude the earliest and latest offset here so the timestamp in the returned
+ // OffsetAndTimestamp is always positive.
+ if (entry.getValue() < 0)
+ throw new IllegalArgumentException("The target time for partition " + entry.getKey() + " is " +
+ entry.getValue() + ". The target time cannot be negative.");
+ }
+ return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
+ } finally {
+ release();
}
- return fetcher.getOffsetsByTimes(timestampsToSearch, requestTimeoutMs);
}
/**
@@ -1510,7 +1515,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, Long> beginningOffsets(Collection<TopicPartition> partitions) {
- return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+ acquireAndEnsureOpen();
+ try {
+ return fetcher.beginningOffsets(partitions, requestTimeoutMs);
+ } finally {
+ release();
+ }
}
/**
@@ -1532,7 +1542,13 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
*/
@Override
public Map<TopicPartition, Long> endOffsets(Collection<TopicPartition> partitions) {
- return fetcher.endOffsets(partitions, requestTimeoutMs);
+ acquireAndEnsureOpen();
+ try {
+ return fetcher.endOffsets(partitions, requestTimeoutMs);
+ } finally {
+ release();
+ }
+
}
/**
@@ -1564,13 +1580,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* @throws IllegalArgumentException If the <code>timeout</code> is negative.
*/
public void close(long timeout, TimeUnit timeUnit) {
- if (closed)
- return;
if (timeout < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
acquire();
try {
- close(timeUnit.toMillis(timeout), false);
+ if (!closed) {
+ closed = true;
+ close(timeUnit.toMillis(timeout), false);
+ }
} finally {
release();
}
@@ -1599,7 +1616,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
private void close(long timeoutMs, boolean swallowException) {
log.trace("Closing the Kafka consumer.");
AtomicReference<Throwable> firstException = new AtomicReference<>();
- this.closed = true;
try {
if (coordinator != null)
coordinator.close(Math.min(timeoutMs, requestTimeoutMs));
@@ -1651,23 +1667,25 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
}
}
- /*
- * Check that the consumer hasn't been closed.
+ /**
+ * Acquire the light lock and ensure that the consumer hasn't been closed.
+ * @throws IllegalStateException If the consumer has been closed
*/
- private void ensureNotClosed() {
- if (this.closed)
+ private void acquireAndEnsureOpen() {
+ acquire();
+ if (this.closed) {
+ release();
throw new IllegalStateException("This consumer has already been closed.");
+ }
}
/**
* Acquire the light lock protecting this consumer from multi-threaded access. Instead of blocking
* when the lock is not available, however, we just throw an exception (since multi-threaded usage is not
* supported).
- * @throws IllegalStateException if the consumer has been closed
* @throws ConcurrentModificationException if another thread already has the lock
*/
private void acquire() {
- ensureNotClosed();
long threadId = Thread.currentThread().getId();
if (threadId != currentThread.get() && !currentThread.compareAndSet(NO_CURRENT_THREAD, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
http://git-wip-us.apache.org/repos/asf/kafka/blob/031da889/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 45fccb7..219c3f6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -1248,6 +1248,7 @@ public class KafkaConsumerTest {
KafkaConsumer<byte[], byte[]> consumer = newConsumer();
consumer.close();
consumer.close();
+ consumer.close();
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/031da889/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index dc51d67..a06cc29 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -333,7 +333,7 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
val rebalanceFuture = createConsumerToRebalance()
// consumer1 should leave group and close immediately even though rebalance is in progress
- submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
+ val closeFuture1 = submitCloseAndValidate(consumer1, Long.MaxValue, None, Some(gracefulCloseTimeMs))
// Rebalance should complete without waiting for consumer1 to timeout since consumer1 has left the group
waitForRebalance(2000, rebalanceFuture, consumer2)
@@ -343,7 +343,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
servers.foreach(server => killBroker(server.config.brokerId))
// consumer2 should close immediately without LeaveGroup request since there are no brokers available
- submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))
+ val closeFuture2 = submitCloseAndValidate(consumer2, Long.MaxValue, None, Some(0))
+
+ // Ensure futures complete to avoid concurrent shutdown attempt during test cleanup
+ closeFuture1.get(2000, TimeUnit.MILLISECONDS)
+ closeFuture2.get(2000, TimeUnit.MILLISECONDS)
}
private def createConsumer(groupId: String) : KafkaConsumer[Array[Byte], Array[Byte]] = {