You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/03/03 21:27:42 UTC

[kafka] branch trunk updated: KAFKA-6606; Ensure consumer awaits auto-commit interval after sending… (#4641)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 604b93c  KAFKA-6606; Ensure consumer awaits auto-commit interval after sending… (#4641)
604b93c is described below

commit 604b93cfdee7a8c5c879ef2217d50be88e39ac89
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Sat Mar 3 13:27:29 2018 -0800

    KAFKA-6606; Ensure consumer awaits auto-commit interval after sending… (#4641)
    
    We need to reset the auto-commit deadline after sending the offset commit request so that we do not resend it while the request is still inflight.
    
    Added unit tests ensuring this behavior and proper backoff in the case of a failure.
    
    Reviewers: Guozhang Wang <wa...@gmail.com>
---
 .../consumer/internals/ConsumerCoordinator.java    |  12 +-
 .../internals/ConsumerCoordinatorTest.java         | 307 ++++++++++++++-------
 2 files changed, 210 insertions(+), 109 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 2afa1ff..3c99c96 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -621,6 +621,7 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
 
     public void maybeAutoCommitOffsetsAsync(long now) {
         if (autoCommitEnabled && now >= nextAutoCommitDeadline) {
+            this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
             doAutoCommitOffsetsAsync();
         }
     }
@@ -633,14 +634,15 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
             @Override
             public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                 if (exception != null) {
-                    log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
-                    if (exception instanceof RetriableException)
+                    if (exception instanceof RetriableException) {
+                        log.debug("Asynchronous auto-commit of offsets {} failed due to retriable error: {}", offsets,
+                                exception);
                         nextAutoCommitDeadline = Math.min(time.milliseconds() + retryBackoffMs, nextAutoCommitDeadline);
-                    else
-                        nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
+                    } else {
+                        log.warn("Asynchronous auto-commit of offsets {} failed: {}", offsets, exception.getMessage());
+                    }
                 } else {
                     log.debug("Completed asynchronous auto-commit of offsets {}", offsets);
-                    nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;
                 }
             }
         });
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index ac39205..3e3c423 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -99,7 +99,6 @@ public class ConsumerCoordinatorTest {
     private int sessionTimeoutMs = 10000;
     private int heartbeatIntervalMs = 5000;
     private long retryBackoffMs = 100;
-    private boolean autoCommitEnabled = false;
     private int autoCommitIntervalMs = 2000;
     private MockPartitionAssignor partitionAssignor = new MockPartitionAssignor();
     private List<PartitionAssignor> assignors = Collections.<PartitionAssignor>singletonList(partitionAssignor);
@@ -134,7 +133,7 @@ public class ConsumerCoordinatorTest {
         this.partitionAssignor.clear();
 
         client.setNode(node);
-        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, autoCommitEnabled, true);
+        this.coordinator = buildCoordinator(metrics, assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, false, true);
     }
 
     @After
@@ -209,7 +208,7 @@ public class ConsumerCoordinatorTest {
         final AtomicInteger responses = new AtomicInteger(0);
 
         for (int i = 0; i < numRequests; i++) {
-            Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(tp, new OffsetAndMetadata(i));
+            Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(tp, new OffsetAndMetadata(i));
             coordinator.commitOffsetsAsync(offsets, new OffsetCommitCallback() {
                 @Override
                 public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
@@ -237,7 +236,7 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         final AtomicBoolean asyncCallbackInvoked = new AtomicBoolean(false);
-        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = Collections.singletonMap(
+        Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = singletonMap(
                 new TopicPartition("foo", 0), new OffsetCommitRequest.PartitionData(13L, ""));
         consumerClient.send(coordinator.checkAndGetCoordinator(), new OffsetCommitRequest.Builder(groupId, offsets))
                 .compose(new RequestFutureAdapter<ClientResponse, Object>() {
@@ -379,8 +378,8 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // normal join group
-        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p)));
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -418,8 +417,8 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // normal join group
-        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(t1p, t2p)));
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(singletonMap(consumerId, Arrays.asList(t1p, t2p)));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -521,8 +520,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p)));
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
 
         // prepare only the first half of the join and then trigger the wakeup
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
@@ -621,13 +620,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
-
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
-
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -648,13 +641,7 @@ public class ConsumerCoordinatorTest {
         final String consumerId = "consumer";
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
-
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
-
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
         final AtomicBoolean received = new AtomicBoolean(false);
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -782,8 +769,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
-        partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(t1p)));
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
+        partitionAssignor.prepare(singletonMap(consumerId, singletonList(t1p)));
 
         // the leader is responsible for picking up metadata changes and forcing a group rebalance
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
@@ -820,8 +807,8 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // prepare initial rebalance
-        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, topics);
-        partitionAssignor.prepare(Collections.singletonMap(consumerId, Collections.singletonList(tp1)));
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, topics);
+        partitionAssignor.prepare(singletonMap(consumerId, Collections.singletonList(tp1)));
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
         client.prepareResponse(new MockClient.RequestMatcher() {
@@ -888,7 +875,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        Map<String, List<String>> memberSubscriptions = Collections.singletonMap(consumerId, singletonList(topic1));
+        Map<String, List<String>> memberSubscriptions = singletonMap(consumerId, singletonList(topic1));
         partitionAssignor.prepare(Collections.<String, List<TopicPartition>>emptyMap());
 
         client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE));
@@ -938,13 +925,8 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
-
         // join the group once
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+        joinAsFollowerAndReceiveAssignment("consumer", coordinator, singletonList(t1p));
 
         assertEquals(1, rebalanceListener.revokedCount);
         assertTrue(rebalanceListener.revoked.isEmpty());
@@ -1003,10 +985,10 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
 
         AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
     }
@@ -1030,10 +1012,10 @@ public class ConsumerCoordinatorTest {
 
         MockCommitCallback firstCommitCallback = new MockCommitCallback();
         MockCommitCallback secondCommitCallback = new MockCommitCallback();
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback);
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), firstCommitCallback);
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), secondCommitCallback);
 
-        client.respond(offsetCommitResponse(Collections.singletonMap(t1p, error)));
+        respondToOffsetCommitRequest(singletonMap(t1p, 100L), error);
         consumerClient.pollNoWakeup();
         consumerClient.pollNoWakeup(); // second poll since coordinator disconnect is async
         coordinator.invokeCompletedOffsetCommitCallbacks();
@@ -1051,20 +1033,85 @@ public class ConsumerCoordinatorTest {
                 ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
 
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
+        subscriptions.seek(t1p, 100);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+        time.sleep(autoCommitIntervalMs);
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertFalse(client.hasPendingResponses());
+    }
 
-        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-        coordinator.joinGroupIfNeeded();
+    @Test
+    public void testAutoCommitRetryBackoff() {
+        final String consumerId = "consumer";
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
         subscriptions.seek(t1p, 100);
+        time.sleep(autoCommitIntervalMs);
+
+        // Send an offset commit, but let it fail with a retriable error
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertTrue(coordinator.coordinatorUnknown());
+
+        // After the disconnect, we should rediscover the coordinator
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+
+        subscriptions.seek(t1p, 200);
+
+        // Until the retry backoff has expired, we should not retry the offset commit
+        time.sleep(retryBackoffMs / 2);
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertEquals(0, client.inFlightRequestCount());
+
+        // Once the backoff expires, we should retry
+        time.sleep(retryBackoffMs / 2);
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertEquals(1, client.inFlightRequestCount());
+        respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
+    }
+
+    @Test
+    public void testAutoCommitAwaitsInterval() {
+        final String consumerId = "consumer";
+        ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors,
+                ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true, true);
+        subscriptions.subscribe(singleton(topic1), rebalanceListener);
+        joinAsFollowerAndReceiveAssignment(consumerId, coordinator, singletonList(t1p));
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        subscriptions.seek(t1p, 100);
         time.sleep(autoCommitIntervalMs);
+
+        // Send the offset commit request, but do not respond
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
-        assertFalse(client.hasPendingResponses());
+        assertEquals(1, client.inFlightRequestCount());
+
+        time.sleep(autoCommitIntervalMs / 2);
+
+        // Ensure that no additional offset commit is sent
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertEquals(1, client.inFlightRequestCount());
+
+        respondToOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertEquals(0, client.inFlightRequestCount());
+
+        subscriptions.seek(t1p, 200);
+
+        // If we poll again before the auto-commit interval, there should be no new sends
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertEquals(0, client.inFlightRequestCount());
+
+        // After the remainder of the interval passes, we send a new offset commit
+        time.sleep(autoCommitIntervalMs / 2);
+        coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
+        assertEquals(1, client.inFlightRequestCount());
+        respondToOffsetCommitRequest(singletonMap(t1p, 200L), Errors.NONE);
     }
 
     @Test
@@ -1089,7 +1136,7 @@ public class ConsumerCoordinatorTest {
 
         subscriptions.seek(t1p, 100);
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
         assertFalse(client.hasPendingResponses());
@@ -1106,7 +1153,7 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
         time.sleep(autoCommitIntervalMs);
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
         assertFalse(client.hasPendingResponses());
@@ -1131,7 +1178,7 @@ public class ConsumerCoordinatorTest {
 
         // sleep only for the retry backoff
         time.sleep(retryBackoffMs);
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
         coordinator.poll(time.milliseconds(), Long.MAX_VALUE);
         assertFalse(client.hasPendingResponses());
     }
@@ -1143,11 +1190,11 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
 
         AtomicBoolean success = new AtomicBoolean(false);
 
-        Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
+        Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p, new OffsetAndMetadata(100L, "hello"));
         coordinator.commitOffsetsAsync(offsets, callback(offsets, success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
@@ -1158,8 +1205,8 @@ public class ConsumerCoordinatorTest {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
         assertNull(mockOffsetCommitCallback.exception);
@@ -1170,15 +1217,7 @@ public class ConsumerCoordinatorTest {
         // enable auto-assignment
         subscriptions.subscribe(singleton(topic1), rebalanceListener);
 
-        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.ensureCoordinatorReady();
-
-        client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE));
-        client.prepareResponse(syncGroupResponse(singletonList(t1p), Errors.NONE));
-
-        client.prepareMetadataUpdate(cluster, Collections.<String>emptySet());
-
-        coordinator.joinGroupIfNeeded();
+        joinAsFollowerAndReceiveAssignment("consumer", coordinator, singletonList(t1p));
 
         // now switch to manual assignment
         client.prepareResponse(new LeaveGroupResponse(Errors.NONE));
@@ -1194,10 +1233,10 @@ public class ConsumerCoordinatorTest {
                 return commitRequest.memberId().equals(OffsetCommitRequest.DEFAULT_MEMBER_ID) &&
                         commitRequest.generationId() == OffsetCommitRequest.DEFAULT_GENERATION_ID;
             }
-        }, offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        }, offsetCommitResponse(singletonMap(t1p, Errors.NONE)));
 
         AtomicBoolean success = new AtomicBoolean(false);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), callback(success));
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertTrue(success.get());
     }
@@ -1207,8 +1246,8 @@ public class ConsumerCoordinatorTest {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
         assertTrue(mockOffsetCommitCallback.exception instanceof RetriableCommitFailedException);
@@ -1221,8 +1260,8 @@ public class ConsumerCoordinatorTest {
 
         // async commit with coordinator not available
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -1237,8 +1276,8 @@ public class ConsumerCoordinatorTest {
 
         // async commit with not coordinator
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR)));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -1253,8 +1292,8 @@ public class ConsumerCoordinatorTest {
 
         // async commit with coordinator disconnected
         MockCommitCallback cb = new MockCommitCallback();
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)), true);
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
+        prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(100L)), cb);
         coordinator.invokeCompletedOffsetCommitCallbacks();
 
         assertTrue(coordinator.coordinatorUnknown());
@@ -1268,10 +1307,10 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NOT_COORDINATOR)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NOT_COORDINATOR);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test
@@ -1280,10 +1319,10 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.COORDINATOR_NOT_AVAILABLE)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.COORDINATOR_NOT_AVAILABLE);
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test
@@ -1292,10 +1331,10 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with coordinator disconnected (should connect, get metadata, and then submit the commit request)
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)), true);
+        prepareOffsetCommitRequestDisconnect(singletonMap(t1p, 100L));
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test
@@ -1307,7 +1346,7 @@ public class ConsumerCoordinatorTest {
         final OffsetAndMetadata firstOffset = new OffsetAndMetadata(0L);
         final OffsetAndMetadata secondOffset = new OffsetAndMetadata(1L);
 
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, firstOffset), new OffsetCommitCallback() {
+        coordinator.commitOffsetsAsync(singletonMap(t1p, firstOffset), new OffsetCommitCallback() {
             @Override
             public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) {
                 committedOffsets.add(firstOffset);
@@ -1318,7 +1357,7 @@ public class ConsumerCoordinatorTest {
         Thread thread = new Thread() {
             @Override
             public void run() {
-                coordinator.commitOffsetsSync(Collections.singletonMap(t1p, secondOffset), 10000);
+                coordinator.commitOffsetsSync(singletonMap(t1p, secondOffset), 10000);
                 committedOffsets.add(secondOffset);
             }
         };
@@ -1326,8 +1365,8 @@ public class ConsumerCoordinatorTest {
         thread.start();
 
         client.waitForRequests(2, 5000);
-        client.respond(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
-        client.respond(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        respondToOffsetCommitRequest(singletonMap(t1p, firstOffset.offset()), Errors.NONE);
+        respondToOffsetCommitRequest(singletonMap(t1p, secondOffset.offset()), Errors.NONE);
 
         thread.join();
 
@@ -1339,8 +1378,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_TOPIC_OR_PARTITION)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_TOPIC_OR_PARTITION);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = OffsetMetadataTooLarge.class)
@@ -1349,8 +1388,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.OFFSET_METADATA_TOO_LARGE)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.OFFSET_METADATA_TOO_LARGE);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
@@ -1359,8 +1398,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.ILLEGAL_GENERATION)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.ILLEGAL_GENERATION);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
@@ -1369,8 +1408,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_MEMBER_ID)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_MEMBER_ID);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = CommitFailedException.class)
@@ -1379,8 +1418,8 @@ public class ConsumerCoordinatorTest {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
 
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.REBALANCE_IN_PROGRESS)));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.REBALANCE_IN_PROGRESS);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L, "metadata")), Long.MAX_VALUE);
     }
 
     @Test(expected = KafkaException.class)
@@ -1389,21 +1428,21 @@ public class ConsumerCoordinatorTest {
         coordinator.ensureCoordinatorReady();
 
         // sync commit with invalid partitions should throw if we have no callback
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.UNKNOWN_SERVER_ERROR)), false);
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.UNKNOWN_SERVER_ERROR);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), Long.MAX_VALUE);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void testCommitSyncNegativeOffset() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), Long.MAX_VALUE);
+        coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(-1L)), Long.MAX_VALUE);
     }
 
     @Test
     public void testCommitAsyncNegativeOffset() {
         int invokedBeforeTest = mockOffsetCommitCallback.invoked;
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        coordinator.commitOffsetsAsync(Collections.singletonMap(t1p, new OffsetAndMetadata(-1L)), mockOffsetCommitCallback);
+        coordinator.commitOffsetsAsync(singletonMap(t1p, new OffsetAndMetadata(-1L)), mockOffsetCommitCallback);
         coordinator.invokeCompletedOffsetCommitCallbacks();
         assertEquals(invokedBeforeTest + 1, mockOffsetCommitCallback.invoked);
         assertTrue(mockOffsetCommitCallback.exception instanceof IllegalArgumentException);
@@ -1413,7 +1452,7 @@ public class ConsumerCoordinatorTest {
     public void testCommitOffsetSyncWithoutFutureGetsCompleted() {
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
         coordinator.ensureCoordinatorReady();
-        assertFalse(coordinator.commitOffsetsSync(Collections.singletonMap(t1p, new OffsetAndMetadata(100L)), 0));
+        assertFalse(coordinator.commitOffsetsSync(singletonMap(t1p, new OffsetAndMetadata(100L)), 0));
     }
 
     @Test
@@ -1684,7 +1723,7 @@ public class ConsumerCoordinatorTest {
         coordinator.markCoordinatorUnknown();
         assertTrue(coordinator.coordinatorUnknown());
         client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
-        client.prepareResponse(offsetCommitResponse(Collections.singletonMap(t1p, Errors.NONE)));
+        prepareOffsetCommitRequest(singletonMap(t1p, 100L), Errors.NONE);
 
         // async commit offset should find coordinator
         time.sleep(autoCommitIntervalMs); // sleep for a while to ensure auto commit does happen
@@ -1852,7 +1891,7 @@ public class ConsumerCoordinatorTest {
 
     private OffsetFetchResponse offsetFetchResponse(TopicPartition tp, Errors partitionLevelError, String metadata, long offset) {
         OffsetFetchResponse.PartitionData data = new OffsetFetchResponse.PartitionData(offset, metadata, partitionLevelError);
-        return new OffsetFetchResponse(Errors.NONE, Collections.singletonMap(tp, data));
+        return new OffsetFetchResponse(Errors.NONE, singletonMap(tp, data));
     }
 
     private OffsetCommitCallback callback(final AtomicBoolean success) {
@@ -1865,6 +1904,67 @@ public class ConsumerCoordinatorTest {
         };
     }
 
+    private void joinAsFollowerAndReceiveAssignment(String consumerId,
+                                                    ConsumerCoordinator coordinator,
+                                                    List<TopicPartition> assignment) {
+        client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE));
+        coordinator.ensureCoordinatorReady();
+        client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE));
+        client.prepareResponse(syncGroupResponse(assignment, Errors.NONE));
+        coordinator.joinGroupIfNeeded();
+    }
+
+    private void prepareOffsetCommitRequest(Map<TopicPartition, Long> expectedOffsets, Errors error) {
+        prepareOffsetCommitRequest(expectedOffsets, error, false);
+    }
+
+    private void prepareOffsetCommitRequestDisconnect(Map<TopicPartition, Long> expectedOffsets) {
+        prepareOffsetCommitRequest(expectedOffsets, Errors.NONE, true);
+    }
+
+    private void prepareOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets,
+                                            Errors error,
+                                            boolean disconnected) {
+        Map<TopicPartition, Errors> errors = partitionErrors(expectedOffsets.keySet(), error);
+        client.prepareResponse(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors), disconnected);
+    }
+
+    private Map<TopicPartition, Errors> partitionErrors(Collection<TopicPartition> partitions, Errors error) {
+        final Map<TopicPartition, Errors> errors = new HashMap<>();
+        for (TopicPartition partition : partitions) {
+            errors.put(partition, error);
+        }
+        return errors;
+    }
+
+    private void respondToOffsetCommitRequest(final Map<TopicPartition, Long> expectedOffsets, Errors error) {
+        Map<TopicPartition, Errors> errors = partitionErrors(expectedOffsets.keySet(), error);
+        client.respond(offsetCommitRequestMatcher(expectedOffsets), offsetCommitResponse(errors));
+    }
+
+    private MockClient.RequestMatcher offsetCommitRequestMatcher(final Map<TopicPartition, Long> expectedOffsets) {
+        return new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                OffsetCommitRequest req = (OffsetCommitRequest) body;
+                Map<TopicPartition, OffsetCommitRequest.PartitionData> offsets = req.offsetData();
+                if (offsets.size() != expectedOffsets.size())
+                    return false;
+
+                for (Map.Entry<TopicPartition, Long> expectedOffset : expectedOffsets.entrySet()) {
+                    if (!offsets.containsKey(expectedOffset.getKey()))
+                        return false;
+
+                    OffsetCommitRequest.PartitionData offsetCommitData = offsets.get(expectedOffset.getKey());
+                    if (offsetCommitData.offset != expectedOffset.getValue())
+                        return false;
+                }
+
+                return true;
+            }
+        };
+    }
+
     private OffsetCommitCallback callback(final Map<TopicPartition, OffsetAndMetadata> expectedOffsets,
                                           final AtomicBoolean success) {
         return new OffsetCommitCallback() {
@@ -1876,7 +1976,6 @@ public class ConsumerCoordinatorTest {
         };
     }
 
-
     private static class MockCommitCallback implements OffsetCommitCallback {
         public int invoked = 0;
         public Exception exception = null;

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.