You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/08/12 23:35:13 UTC

kafka git commit: KAFKA-4034; Avoid unnecessary consumer coordinator lookup

Repository: kafka
Updated Branches:
  refs/heads/trunk fc55f804e -> be36b3227


KAFKA-4034; Avoid unnecessary consumer coordinator lookup

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Guozhang Wang <wa...@gmail.com>, Ismael Juma <is...@juma.me.uk>

Closes #1720 from hachikuji/KAFKA-4034


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/be36b322
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/be36b322
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/be36b322

Branch: refs/heads/trunk
Commit: be36b322749003581474e2c84a3ec9ba2aaec53c
Parents: fc55f80
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Aug 12 23:26:41 2016 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Aug 12 23:26:41 2016 +0100

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 22 ++++++----
 .../RetriableCommitFailedException.java         |  4 ++
 .../consumer/internals/AbstractCoordinator.java | 25 ++++++++++-
 .../consumer/internals/ConsumerCoordinator.java | 38 +++++++++++++----
 .../clients/consumer/internals/Fetcher.java     | 13 +++++-
 .../consumer/internals/SubscriptionState.java   |  2 +-
 .../clients/consumer/KafkaConsumerTest.java     | 44 +++++++++++++++++++-
 .../kafka/api/AuthorizerIntegrationTest.scala   | 18 +++++++-
 8 files changed, 147 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ff94dc8..522cfde 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -997,9 +997,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      * @return The fetched records (may be empty)
      */
     private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {
-        // TODO: Sub-requests should take into account the poll timeout (KAFKA-1894)
-        coordinator.ensureCoordinatorReady();
-
         // ensure we have partitions assigned if we expect to
         if (subscriptions.partitionsAutoAssigned())
             coordinator.ensurePartitionAssignment();
@@ -1429,11 +1426,22 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {
      *             defined
      */
     private void updateFetchPositions(Set<TopicPartition> partitions) {
-        // refresh commits for all assigned partitions
-        coordinator.refreshCommittedOffsetsIfNeeded();
+        // lookup any positions for partitions which are awaiting reset (which may be the
+        // case if the user called seekToBeginning or seekToEnd. We do this check first to
+        // avoid an unnecessary lookup of committed offsets (which typically occurs when
+        // the user is manually assigning partitions and managing their own offsets).
+        fetcher.resetOffsetsIfNeeded(partitions);
+
+        if (!subscriptions.hasAllFetchPositions()) {
+            // if we still don't have offsets for all partitions, then we should either seek
+            // to the last committed position or reset using the auto reset policy
 
-        // then do any offset lookups in case some positions are not known
-        fetcher.updateFetchPositions(partitions);
+            // first refresh commits for all assigned partitions
+            coordinator.refreshCommittedOffsetsIfNeeded();
+
+            // then do any offset lookups in case some positions are not known
+            fetcher.updateFetchPositions(partitions);
+        }
     }
 
     /*

http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
index 459a8ac..1c1a2f5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/RetriableCommitFailedException.java
@@ -22,6 +22,10 @@ public class RetriableCommitFailedException extends RetriableException {
 
     private static final long serialVersionUID = 1L;
 
+    public RetriableCommitFailedException(Throwable t) {
+        super("Offset commit failed with a retriable exception. You should retry committing offsets.", t);
+    }
+
     public RetriableCommitFailedException(String message) {
         super(message);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 6bb4406..e957856 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -98,6 +98,8 @@ public abstract class AbstractCoordinator implements Closeable {
     protected String protocol;
     protected int generation;
 
+    private RequestFuture<Void> findCoordinatorFuture = null;
+
     /**
      * Initialize the coordination manager.
      */
@@ -175,7 +177,7 @@ public abstract class AbstractCoordinator implements Closeable {
      */
     public void ensureCoordinatorReady() {
         while (coordinatorUnknown()) {
-            RequestFuture<Void> future = sendGroupCoordinatorRequest();
+            RequestFuture<Void> future = lookupCoordinator();
             client.poll(future);
 
             if (future.failed()) {
@@ -189,8 +191,25 @@ public abstract class AbstractCoordinator implements Closeable {
                 coordinatorDead();
                 time.sleep(retryBackoffMs);
             }
+        }
+    }
+
+    protected RequestFuture<Void> lookupCoordinator() {
+        if (findCoordinatorFuture == null) {
+            findCoordinatorFuture = sendGroupCoordinatorRequest();
+            findCoordinatorFuture.addListener(new RequestFutureListener<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    findCoordinatorFuture = null;
+                }
 
+                @Override
+                public void onFailure(RuntimeException e) {
+                    findCoordinatorFuture = null;
+                }
+            });
         }
+        return findCoordinatorFuture;
     }
 
     /**
@@ -205,6 +224,10 @@ public abstract class AbstractCoordinator implements Closeable {
      * Ensure that the group is active (i.e. joined and synced)
      */
     public void ensureActiveGroup() {
+        // always ensure that the coordinator is ready because we may have been disconnected
+        // when sending heartbeats and does not necessarily require us to rejoin the group.
+        ensureCoordinatorReady();
+
         if (!needRejoin())
             return;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index b210746..81a40f1 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -366,7 +366,36 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
     }
 
 
-    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback) {
+    public void commitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
+        if (!coordinatorUnknown()) {
+            doCommitOffsetsAsync(offsets, callback);
+        } else {
+            // we don't know the current coordinator, so try to find it and then send the commit
+            // or fail (we don't want recursive retries which can cause offset commits to arrive
+            // out of order). Note that there may be multiple offset commits chained to the same
+            // coordinator lookup request. This is fine because the listeners will be invoked in
+            // the same order that they were added. Note also that AbstractCoordinator prevents
+            // multiple concurrent coordinator lookup requests.
+            lookupCoordinator().addListener(new RequestFutureListener<Void>() {
+                @Override
+                public void onSuccess(Void value) {
+                    doCommitOffsetsAsync(offsets, callback);
+                }
+
+                @Override
+                public void onFailure(RuntimeException e) {
+                    callback.onComplete(offsets, new RetriableCommitFailedException(e));
+                }
+            });
+        }
+
+        // ensure the commit has a chance to be transmitted (without blocking on its completion).
+        // Note that commits are treated as heartbeats by the coordinator, so there is no need to
+        // explicitly allow heartbeats through delayed task execution.
+        client.pollNoWakeup();
+    }
+
+    private void doCommitOffsetsAsync(final Map<TopicPartition, OffsetAndMetadata> offsets, final OffsetCommitCallback callback) {
         this.subscriptions.needRefreshCommits();
         RequestFuture<Void> future = sendOffsetCommitRequest(offsets);
         final OffsetCommitCallback cb = callback == null ? defaultOffsetCommitCallback : callback;
@@ -381,17 +410,12 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             @Override
             public void onFailure(RuntimeException e) {
                 if (e instanceof RetriableException) {
-                    cb.onComplete(offsets, new RetriableCommitFailedException("Commit offsets failed with retriable exception. You should retry committing offsets.", e));
+                    cb.onComplete(offsets, new RetriableCommitFailedException(e));
                 } else {
                     cb.onComplete(offsets, e);
                 }
             }
         });
-
-        // ensure the commit has a chance to be transmitted (without blocking on its completion).
-        // Note that commits are treated as heartbeats by the coordinator, so there is no need to
-        // explicitly allow heartbeats through delayed task execution.
-        client.pollNoWakeup();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index cf2ebc3..fec9b6e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -155,6 +155,18 @@ public class Fetcher<K, V> {
     }
 
     /**
+     * Lookup and set offsets for any partitions which are awaiting an explicit reset.
+     * @param partitions the partitions to reset
+     */
+    public void resetOffsetsIfNeeded(Set<TopicPartition> partitions) {
+        for (TopicPartition tp : partitions) {
+            // TODO: If there are several offsets to reset, we could submit offset requests in parallel
+            if (subscriptions.isAssigned(tp) && subscriptions.isOffsetResetNeeded(tp))
+                resetOffset(tp);
+        }
+    }
+
+    /**
      * Update the fetch positions for the provided partitions.
      * @param partitions the partitions to update positions for
      * @throws NoOffsetForPartitionException If no offset is stored for a given partition and no reset policy is available
@@ -165,7 +177,6 @@ public class Fetcher<K, V> {
             if (!subscriptions.isAssigned(tp) || subscriptions.isFetchable(tp))
                 continue;
 
-            // TODO: If there are several offsets to reset, we could submit offset requests in parallel
             if (subscriptions.isOffsetResetNeeded(tp)) {
                 resetOffset(tp);
             } else if (subscriptions.committed(tp) == null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
index 38660e1..e9b2eb2 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
@@ -49,7 +49,7 @@ public class SubscriptionState {
 
     private enum SubscriptionType {
         NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED
-    };
+    }
 
     /* the type of subscription */
     private SubscriptionType subscriptionType;

http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
index 9affa79..8b52664 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java
@@ -43,6 +43,7 @@ import org.apache.kafka.common.requests.FetchResponse;
 import org.apache.kafka.common.requests.GroupCoordinatorResponse;
 import org.apache.kafka.common.requests.HeartbeatResponse;
 import org.apache.kafka.common.requests.JoinGroupResponse;
+import org.apache.kafka.common.requests.ListOffsetResponse;
 import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.requests.OffsetCommitResponse;
 import org.apache.kafka.common.requests.OffsetFetchResponse;
@@ -89,7 +90,7 @@ public class KafkaConsumerTest {
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
         try {
-            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<byte[], byte[]>(
+            KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(
                     props, new ByteArrayDeserializer(), new ByteArrayDeserializer());
         } catch (KafkaException e) {
             assertEquals(oldInitCount + 1, MockMetricsReporter.INIT_COUNT.get());
@@ -455,6 +456,38 @@ public class KafkaConsumerTest {
     }
 
     @Test
+    public void verifyNoCoordinatorLookupForManualAssignmentWithSeek() {
+        String topic = "topic";
+        final TopicPartition partition = new TopicPartition(topic, 0);
+        int sessionTimeoutMs = 3000;
+        int heartbeatIntervalMs = 2000;
+        int autoCommitIntervalMs = 1000;
+
+        Time time = new MockTime();
+        MockClient client = new MockClient(time);
+        Cluster cluster = TestUtils.singletonCluster(topic, 1);
+        Node node = cluster.nodes().get(0);
+        client.setNode(node);
+        Metadata metadata = new Metadata(0, Long.MAX_VALUE);
+        metadata.update(cluster, time.milliseconds());
+        PartitionAssignor assignor = new RoundRobinAssignor();
+
+        final KafkaConsumer<String, String> consumer = newConsumer(time, client, metadata, assignor,
+                sessionTimeoutMs, heartbeatIntervalMs, autoCommitIntervalMs);
+        consumer.assign(Arrays.asList(partition));
+        consumer.seekToBeginning(Arrays.asList(partition));
+
+        // there shouldn't be any need to lookup the coordinator or fetch committed offsets.
+        // we just lookup the starting position and send the record fetch.
+        client.prepareResponse(listOffsetsResponse(Collections.singletonMap(partition, 50L), Errors.NONE.code()));
+        client.prepareResponse(fetchResponse(partition, 50L, 5));
+
+        ConsumerRecords<String, String> records = consumer.poll(0);
+        assertEquals(5, records.count());
+        assertEquals(55L, consumer.position(partition));
+    }
+
+    @Test
     public void testCommitsFetchedDuringAssign() {
         String topic = "topic";
         final TopicPartition partition1 = new TopicPartition(topic, 0);
@@ -669,6 +702,15 @@ public class KafkaConsumerTest {
         return new OffsetFetchResponse(partitionData).toStruct();
     }
 
+    private Struct listOffsetsResponse(Map<TopicPartition, Long> offsets, short error) {
+        Map<TopicPartition, ListOffsetResponse.PartitionData> partitionData = new HashMap<>();
+        for (Map.Entry<TopicPartition, Long> partitionOffset : offsets.entrySet()) {
+            partitionData.put(partitionOffset.getKey(), new ListOffsetResponse.PartitionData(error,
+                    Collections.singletonList(partitionOffset.getValue())));
+        }
+        return new ListOffsetResponse(partitionData).toStruct();
+    }
+
     private Struct fetchResponse(TopicPartition tp, long fetchOffset, int count) {
         MemoryRecords records = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
         for (int i = 0; i < count; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/be36b322/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 332b681..817cdf7 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -340,13 +340,15 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
-  def testConsumeWithNoGroupAccess(): Unit = {
+  def testSimpleConsumeWithOffsetLookupAndNoGroupAccess(): Unit = {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)
     removeAllAcls()
 
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
     try {
+      // note this still depends on group access because we haven't set offsets explicitly, which means
+      // they will first be fetched from the consumer coordinator (which requires group access)
       this.consumers.head.assign(List(tp).asJava)
       consumeRecords(this.consumers.head)
       Assert.fail("should have thrown exception")
@@ -356,6 +358,20 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   @Test
+  def testSimpleConsumeWithExplicitSeekAndNoGroupAccess(): Unit = {
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
+    sendRecords(1, tp)
+    removeAllAcls()
+
+    addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Read)), topicResource)
+
+    // in this case, we do an explicit seek, so there should be no need to query the coordinator at all
+    this.consumers.head.assign(List(tp).asJava)
+    this.consumers.head.seekToBeginning(List(tp).asJava)
+    consumeRecords(this.consumers.head)
+  }
+
+  @Test
   def testConsumeWithNoTopicAccess() {
     addAndVerifyAcls(Set(new Acl(KafkaPrincipal.ANONYMOUS, Allow, Acl.WildCardHost, Write)), topicResource)
     sendRecords(1, tp)