You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2016/04/12 01:09:34 UTC

[18/50] [abbrv] kafka git commit: KAFKA-3486: fix autocommit when partitions assigned manually

KAFKA-3486: fix autocommit when partitions assigned manually

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ewen Cheslack-Postava <ew...@confluent.io>

Closes #1169 from hachikuji/KAFKA-3486


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd5480a4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd5480a4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd5480a4

Branch: refs/heads/0.10.0
Commit: dd5480a47eb0f45214c179b7f14ffaf493164222
Parents: bd5325d
Author: Jason Gustafson <ja...@confluent.io>
Authored: Sat Apr 2 23:02:19 2016 -0700
Committer: Ewen Cheslack-Postava <me...@ewencp.org>
Committed: Sat Apr 2 23:02:19 2016 -0700

----------------------------------------------------------------------
 .../consumer/internals/ConsumerCoordinator.java |  54 +++-----
 .../internals/ConsumerCoordinatorTest.java      | 122 +++++++++++++++++--
 2 files changed, 134 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd5480a4/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index e582ce3..a364987 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -110,7 +110,13 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
         addMetadataListener();
 
-        this.autoCommitTask = autoCommitEnabled ? new AutoCommitTask(autoCommitIntervalMs) : null;
+        if (autoCommitEnabled) {
+            this.autoCommitTask = new AutoCommitTask(autoCommitIntervalMs);
+            this.autoCommitTask.reschedule();
+        } else {
+            this.autoCommitTask = null;
+        }
+
         this.sensors = new ConsumerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.interceptors = interceptors;
         this.excludeInternalTopics = excludeInternalTopics;
@@ -187,9 +193,9 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
         // give the assignor a chance to update internal state based on the received assignment
         assignor.onAssignment(assignment);
 
-        // restart the autocommit task if needed
+        // reschedule the auto commit starting from now
         if (autoCommitEnabled)
-            autoCommitTask.enable();
+            autoCommitTask.reschedule();
 
         // execute the user's callback after rebalance
         ConsumerRebalanceListener listener = subscriptions.listener();
@@ -384,52 +390,36 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     private class AutoCommitTask implements DelayedTask {
         private final long interval;
-        private boolean enabled = false;
-        private boolean requestInFlight = false;
 
         public AutoCommitTask(long interval) {
             this.interval = interval;
         }
 
-        public void enable() {
-            if (!enabled) {
-                // there shouldn't be any instances scheduled, but call unschedule anyway to ensure
-                // that this task is only ever scheduled once
-                client.unschedule(this);
-                this.enabled = true;
-
-                if (!requestInFlight) {
-                    long now = time.milliseconds();
-                    client.schedule(this, interval + now);
-                }
-            }
-        }
-
-        public void disable() {
-            this.enabled = false;
-            client.unschedule(this);
+        private void reschedule() {
+            client.schedule(this, time.milliseconds() + interval);
         }
 
         private void reschedule(long at) {
-            if (enabled)
-                client.schedule(this, at);
+            client.schedule(this, at);
         }
 
         public void run(final long now) {
-            if (!enabled)
-                return;
-
             if (coordinatorUnknown()) {
                 log.debug("Cannot auto-commit offsets for group {} since the coordinator is unknown", groupId);
-                client.schedule(this, now + retryBackoffMs);
+                reschedule(now + retryBackoffMs);
+                return;
+            }
+
+            if (needRejoin()) {
+                // skip the commit when we're rejoining since we'll commit offsets synchronously
+                // before the revocation callback is invoked
+                reschedule(now + interval);
                 return;
             }
 
-            requestInFlight = true;
             commitOffsetsAsync(subscriptions.allConsumed(), new OffsetCommitCallback() {
                 @Override
                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
-                    requestInFlight = false;
                     if (exception == null) {
                         reschedule(now + interval);
                     } else if (exception instanceof SendFailedException) {
@@ -446,10 +436,6 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     private void maybeAutoCommitOffsetsSync() {
         if (autoCommitEnabled) {
-            // disable periodic commits prior to committing synchronously. note that they will
-            // be re-enabled after a rebalance completes
-            autoCommitTask.disable();
-
             try {
                 commitOffsetsSync(subscriptions.allConsumed());
             } catch (WakeupException e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/dd5480a4/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 8844adc..623e5ef 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -77,11 +77,11 @@ public class ConsumerCoordinatorTest {
     private String topicName = "test";
     private String groupId = "test-group";
     private TopicPartition tp = new TopicPartition(topicName, 0);
-    private int sessionTimeoutMs = 10;
-    private int heartbeatIntervalMs = 2;
+    private int sessionTimeoutMs = 10000;
+    private int heartbeatIntervalMs = 5000;
     private long retryBackoffMs = 100;
     private boolean autoCommitEnabled = false;
-    private long autoCommitIntervalMs = 5000;
+    private long autoCommitIntervalMs = 2000;
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Arrays.<PartitionAssignor>asList(partitionAssignor);
     private MockTime time;
@@ -110,7 +110,7 @@ public class ConsumerCoordinatorTest {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
+        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, autoCommitEnabled);
     }
 
     @After
@@ -546,7 +546,7 @@ public class ConsumerCoordinatorTest {
 
     @Test
     public void testIncludeInternalTopicsConfigOption() {
-        coordinator = buildCoordinator(new Metrics(), assignors, false);
+        coordinator = buildCoordinator(new Metrics(), assignors, false, false);
         subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener);
 
         metadata.update(TestUtils.singletonCluster(TopicConstants.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds());
@@ -633,6 +633,107 @@ public class ConsumerCoordinatorTest {
     }
 
     @Test
+    public void testAutoCommitDynamicAssignment() {
+        final String consumerId = "consumer";
+
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        subscriptions.seek(tp, 100);
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        time.sleep(autoCommitIntervalMs);
+        consumerClient.poll(0);
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testAutoCommitDynamicAssignmentRebalance() {
+        final String consumerId = "consumer";
+
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+        subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener);
+        subscriptions.needReassignment();
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // haven't joined, so should not cause a commit
+        time.sleep(autoCommitIntervalMs);
+        consumerClient.poll(0);
+
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code()));
+        client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code()));
+        coordinator.ensurePartitionAssignment();
+
+        subscriptions.seek(tp, 100);
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        time.sleep(autoCommitIntervalMs);
+        consumerClient.poll(0);
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testAutoCommitManualAssignment() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.seek(tp, 100);
+
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        time.sleep(autoCommitIntervalMs);
+        consumerClient.poll(0);
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
+    public void testAutoCommitManualAssignmentCoordinatorUnknown() {
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, true);
+
+        subscriptions.assignFromUser(Arrays.asList(tp));
+        subscriptions.seek(tp, 100);
+
+        // no commit initially since coordinator is unknown
+        consumerClient.poll(0);
+        time.sleep(autoCommitIntervalMs);
+        consumerClient.poll(0);
+
+        assertNull(subscriptions.committed(tp));
+
+        // now find the coordinator
+        client.prepareResponse(consumerMetadataResponse(node, Errors.NONE.code()));
+        coordinator.ensureCoordinatorKnown();
+
+        // sleep only for the retry backoff
+        time.sleep(retryBackoffMs);
+        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(tp, Errors.NONE.code())));
+        consumerClient.poll(0);
+
+        assertEquals(100L, subscriptions.committed(tp).offset());
+    }
+
+    @Test
     public void testCommitOffsetMetadata() {
         subscriptions.assignFromUser(Arrays.asList(tp));
 
@@ -896,7 +997,8 @@ public class ConsumerCoordinatorTest {
         RangeAssignor range = new RangeAssignor();
 
         try (Metrics metrics = new Metrics(time)) {
-            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(roundRobin, range),
+                    ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(roundRobin.name(), metadata.get(0).name());
@@ -904,7 +1006,8 @@ public class ConsumerCoordinatorTest {
         }
 
         try (Metrics metrics = new Metrics(time)) {
-            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin), ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT);
+            ConsumerCoordinator coordinator = buildCoordinator(metrics, Arrays.<PartitionAssignor>asList(range, roundRobin),
+                    ConsumerConfig.EXCLUDE_INTERNAL_TOPICS_DEFAULT, false);
             List<ProtocolMetadata> metadata = coordinator.metadata();
             assertEquals(2, metadata.size());
             assertEquals(range.name(), metadata.get(0).name());
@@ -912,7 +1015,10 @@ public class ConsumerCoordinatorTest {
         }
     }
 
-    private ConsumerCoordinator buildCoordinator(Metrics metrics, List<PartitionAssignor> assignors, boolean excludeInternalTopics) {
+    private ConsumerCoordinator buildCoordinator(Metrics metrics,
+                                                 List<PartitionAssignor> assignors,
+                                                 boolean excludeInternalTopics,
+                                                 boolean autoCommitEnabled) {
         return new ConsumerCoordinator(
                 consumerClient,
                 groupId,