You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gu...@apache.org on 2015/07/02 00:29:18 UTC

kafka git commit: KAFKA-2168: minor follow-up patch; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 9ff5b27bc -> 14e0ce0a4


KAFKA-2168: minor follow-up patch; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/14e0ce0a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/14e0ce0a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/14e0ce0a

Branch: refs/heads/trunk
Commit: 14e0ce0a47fb7f6ae6dab085b2ea9d5a1f644433
Parents: 9ff5b27
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed Jul 1 15:28:11 2015 -0700
Committer: Guozhang Wang <wa...@gmail.com>
Committed: Wed Jul 1 15:28:11 2015 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java      | 17 ++++++++++-------
 .../clients/consumer/internals/Coordinator.java    | 10 +++++-----
 .../integration/kafka/api/ConsumerBounceTest.scala | 14 ++++++--------
 3 files changed, 21 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 9be8fbc..1f0e515 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -42,6 +42,7 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.ConcurrentModificationException;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -332,6 +333,7 @@ import static org.apache.kafka.common.utils.Utils.min;
  *         }
  *     }
  *
+ *     // Shutdown hook which can be called from a separate thread
  *     public void shutdown() {
  *         closed.set(true);
  *         consumer.wakeup();
@@ -417,7 +419,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     // and is used to prevent multi-threaded access
     private final AtomicReference<Long> currentThread = new AtomicReference<Long>();
     // refcount is used to allow reentrant access by the thread who has acquired currentThread
-    private int refcount = 0; // reference count for reentrant access
+    private final AtomicInteger refcount = new AtomicInteger(0);
 
     // TODO: This timeout controls how long we should wait before retrying a request. We should be able
     //       to leverage the work of KAFKA-2120 to get this value from configuration.
@@ -795,7 +797,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * rebalance and also on startup. As such, if you need to store offsets in anything other than Kafka, this API
      * should not be used.
      * <p>
-     * A non-blocking commit will attempt to commit offsets asychronously. No error will be thrown if the commit fails.
+     * A non-blocking commit will attempt to commit offsets asynchronously. No error will be thrown if the commit fails.
      * A blocking commit will wait for a response acknowledging the commit. In the event of an error it will retry until
      * the commit succeeds.
      * 
@@ -832,7 +834,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
     public void commit(CommitType commitType) {
         acquire();
         try {
-            commit(this.subscriptions.allConsumed(), commitType);
+            // Need defensive copy to ensure offsets are not removed before completion (e.g. in rebalance)
+            Map<TopicPartition, Long> allConsumed = new HashMap<TopicPartition, Long>(this.subscriptions.allConsumed());
+            commit(allConsumed, commitType);
         } finally {
             release();
         }
@@ -978,10 +982,9 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
 
     @Override
     public void close() {
-        if (closed) return;
-
         acquire();
         try {
+            if (closed) return;
             close(false);
         } finally {
             release();
@@ -1355,14 +1358,14 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
         Long threadId = Thread.currentThread().getId();
         if (!threadId.equals(currentThread.get()) && !currentThread.compareAndSet(null, threadId))
             throw new ConcurrentModificationException("KafkaConsumer is not safe for multi-threaded access");
-        refcount++;
+        refcount.incrementAndGet();
     }
 
     /**
      * Release the light lock protecting the consumer from multi-threaded access.
      */
     private void release() {
-        if (--refcount == 0)
+        if (refcount.decrementAndGet() == 0)
             currentThread.set(null);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
index 6c26667..68b4cb1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
@@ -217,7 +217,7 @@ public final class Coordinator {
                 OffsetCommitRequest.DEFAULT_RETENTION_TIME,
                 offsetData);
 
-            RequestCompletionHandler handler = new CommitOffsetCompletionHandler(offsets, future);
+            RequestCompletionHandler handler = new OffsetCommitCompletionHandler(offsets, future);
             sendCoordinator(ApiKeys.OFFSET_COMMIT, req.toStruct(), handler, now);
         }
 
@@ -261,14 +261,14 @@ public final class Coordinator {
         RequestCompletionHandler completionHandler = new RequestCompletionHandler() {
             @Override
             public void onComplete(ClientResponse resp) {
-                handleOffsetResponse(resp, future);
+                handleOffsetFetchResponse(resp, future);
             }
         };
         sendCoordinator(ApiKeys.OFFSET_FETCH, request.toStruct(), completionHandler, now);
         return future;
     }
 
-    private void handleOffsetResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
+    private void handleOffsetFetchResponse(ClientResponse resp, RequestFuture<Map<TopicPartition, Long>> future) {
         if (resp.wasDisconnected()) {
             handleCoordinatorDisconnect(resp);
             future.retryWithNewCoordinator();
@@ -471,12 +471,12 @@ public final class Coordinator {
         }
     }
 
-    private class CommitOffsetCompletionHandler implements RequestCompletionHandler {
+    private class OffsetCommitCompletionHandler implements RequestCompletionHandler {
 
         private final Map<TopicPartition, Long> offsets;
         private final RequestFuture<Void> future;
 
-        public CommitOffsetCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
+        public OffsetCommitCompletionHandler(Map<TopicPartition, Long> offsets, RequestFuture<Void> future) {
             this.offsets = offsets;
             this.future = future;
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/14e0ce0a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
index f56096b..b0750fa 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala
@@ -14,14 +14,10 @@
 package kafka.api
 
 import kafka.server.KafkaConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.consumer.CommitType
+import kafka.utils.{Logging, ShutdownableThread, TestUtils}
+import org.apache.kafka.clients.consumer._
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
-
-import kafka.utils.{ShutdownableThread, TestUtils, Logging}
-
 import org.junit.Assert._
 
 import scala.collection.JavaConversions._
@@ -85,9 +81,11 @@ class ConsumerBounceTest extends IntegrationTestHarness with Logging {
         assertEquals(consumed.toLong, record.offset())
         consumed += 1
       }
+
       consumer.commit(CommitType.SYNC)
+      assertEquals(consumer.position(tp), consumer.committed(tp))
 
-      if (consumed == numRecords) {
+      if (consumer.position(tp) == numRecords) {
         consumer.seekToBeginning()
         consumed = 0
       }