You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/02 00:29:18 UTC
kafka git commit: KAFKA-2168: minor follow-up patch;
reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 9ff5b27bc -> 14e0ce0a4
KAFKA-2168: minor follow-up patch; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14e0ce0a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14e0ce0a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14e0ce0a
Branch: refs/heads/trunk
Commit: 14e0ce0a47fb7f6ae6dab085b2ea9d5a1f644433
Parents: 9ff5b27
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jul 1 15:28:11 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 1 15:28:11 2015 -0700
----------------------------------------------------------------------
.../kafka/clients/consumer/KafkaConsumer.java | 17 ++++++++++-------
.../clients/consumer/internals/Coordinator.java | 10 +++++-----
.../integration/kafka/api/ConsumerBounceTest.scala | 14 ++++++--------
3 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9be8fbc..1f0e515 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.ConcurrentModificationException;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -332,6 +333,7 @@ import static org.apache.kafka.common.utils.Utils.min;
* }
* }
*
+ * // Shutdown hook which can be called from a separate thread
* public void shutdown() {
* closed.set(true);
* consumer.wakeup();
@@ -417,7 +419,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
// and is used to prevent multi-threaded access
private final AtomicReference<Long> currentThread = new AtomicReference<Long>();
// refcount is used to allow reentrant access by the thread who has acquired currentThread
- private int refcount = 0; // reference count for reentrant access
+ private final AtomicInteger refcount = new AtomicInteger(0);
// TODO: This timeout controls how long we should wait before retrying a request. We should be able
// to leverage the work of KAFKA-2120 to get this value from configuration.
@@ -795,7 +797,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
* rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
* should not be used.
* <p>
- * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
+ * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails.
* A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
* the commit succeeds.
*
@@ -832,7 +834,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
public void commit(CommitType commitType) {
acquire();
try {
- commit(this.subscriptions.allConsumed(), commitType);
+ // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
+ Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
+ commit(allConsumed, commitType);
} finally {
release();
}
@@ -978,10 +982,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
@Override
public void close() {
- if (closed) return;
-
acquire();
try {
+ if (closed) return;
close(false);
} finally {
release();
@@ -1355,14 +1358,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
Long threadId = Thread.currentThread().getId();
if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId))
throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
- refcount++;
+ refcount.incrementAndGet();
}
/**
* Release the light lock protecting the consumer from multi-threaded access.
*/
private void release() {
- if (--refcount == 0)
+ if (refcount.decrementAndGet() == 0)
currentThread.set(null);
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 6c26667..68b4cb1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -217,7 +217,7 @@ public final class Coordinator {
OffsetCommitRequest.DEFAULT_RETENTION_TIME,
offsetData);
- RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, future);
+ RequestCompletionHandler handler = new OffsetCommitCompletionHandler(offsets, future);
sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
}
@@ -261,14 +261,14 @@ public final class Coordinator {
RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
@Override
public void onComplete(ClientResponse resp) {
- handleOffsetResponse(resp, future);
+ handleOffsetFetchResponse(resp, future);
}
};
sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now);
return future;
}
- private void handleOffsetResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
+ private void handleOffsetFetchResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
if (resp.wasDisconnected()) {
handleCoordinatorDisconnect(resp);
future.retryWithNewCoordinator();
@@ -471,12 +471,12 @@ public final class Coordinator {
}
}
- private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
+ private class OffsetCommitCompletionHandler implements RequestCompletionHandler {
private final Map<TopicPartition, Long> offsets;
private final RequestFuture<Void> future;
- public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
+ public OffsetCommitCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
this.offsets = offsets;
this.future = future;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f56096b..b0750fa 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -14,14 +14,10 @@
package kafka.api
import kafka.server.KafkaConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.consumer.CommitType
+import kafka.utils.{Logging, ShutdownableThread, TestUtils}
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
import org.apache.kafka.common.TopicPartition
-
-import kafka.utils.{ShutdownableThread, TestUtils, Logging}
-
import org.junit.Assert._
import scala.collection.JavaConversions._
@@ -85,9 +81,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
assertEquals(consumed.toLong, record.offset())
consumed += 1
}
+
consumer.commit(CommitType.SYNC)
+ assertEquals(consumer.position(tp), consumer.committed(tp))
- if (consumed == numRecords) {
+ if (consumer.position(tp) == numRecords) {
consumer.seekToBeginning()
consumed = 0
}