You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/10 07:15:00 UTC

[kafka] 01/02: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group (#8805)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1c6f117b4a7a9a0e648139a44a5ab73371338aa6
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Tue Jun 9 09:41:11 2020 -0700

    KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group (#8805)
    
    In the first version of the incremental cooperative protocol, in the presence of a failed sync request by the leader, the assignor was designed to treat the unapplied assignments as lost and trigger a rebalance delay.
    
    This commit applies optimizations in these cases to avoid the unnecessary activation of the rebalancing delay. First, if the worker that loses the sync group request or response is the leader, then it detects this failure by checking the what is the expected generation when it performs task assignments. If it's not the expected one, it resets its view of the previous assignment because it wasn't successfully applied and it doesn't represent a correct state. Furthermore, if the worker  [...]
    
    Existing unit tests and integration tests are adapted to test the proposed optimizations.
    
    Reviewers: Randall Hauch <rh...@gmail.com>
---
 checkstyle/suppressions.xml                        |   6 +-
 .../consumer/internals/AbstractCoordinator.java    |   4 +-
 .../runtime/distributed/DistributedConfig.java     |   2 +-
 .../IncrementalCooperativeAssignor.java            |  38 +++-
 .../runtime/distributed/WorkerCoordinator.java     |  17 +-
 .../integration/ConnectIntegrationTestUtils.java   |  42 ++++
 .../integration/ConnectWorkerIntegrationTest.java  |   5 +
 .../integration/ExampleConnectIntegrationTest.java |   5 +
 .../RebalanceSourceConnectorsIntegrationTest.java  |  13 +-
 .../IncrementalCooperativeAssignorTest.java        | 227 ++++++++++++++++-----
 10 files changed, 283 insertions(+), 76 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index e064723..63af379 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -84,12 +84,10 @@
     <suppress checks="ClassFanOutComplexity"
               files="Worker.java"/>
     <suppress checks="MethodLength"
-              files="(KafkaConfigBackingStore|RequestResponseTest|WorkerSinkTaskTest).java"/>
+              files="(KafkaConfigBackingStore|IncrementalCooperativeAssignor|RequestResponseTest|WorkerSinkTaskTest).java"/>
 
     <suppress checks="ParameterNumber"
-              files="(WorkerSinkTask|WorkerSourceTask).java"/>
-    <suppress checks="ParameterNumber"
-              files="WorkerCoordinator.java"/>
+              files="Worker(SinkTask|SourceTask|Coordinator).java"/>
     <suppress checks="ParameterNumber"
               files="ConfigKeyInfo.java"/>
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 987c740..8aff856 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -423,8 +423,8 @@ public abstract class AbstractCoordinator implements Closeable {
 
                 // Generation data maybe concurrently cleared by Heartbeat thread.
                 // Can't use synchronized for {@code onJoinComplete}, because it can be long enough
-                // and  shouldn't block hearbeat thread.
-                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment
+                // and shouldn't block heartbeat thread.
+                // See {@link PlaintextConsumerTest#testMaxPollIntervalMsDelayInAssignment}
                 synchronized (this) {
                     generationSnapshot = this.generation;
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index c7ff6d8..1f0b237 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -144,7 +144,7 @@ public class DistributedConfig extends WorkerConfig {
     public static final String CONNECT_PROTOCOL_DEFAULT = ConnectProtocolCompatibility.COMPATIBLE.toString();
 
     /**
-     * <code>connect.protocol</code>
+     * <code>scheduled.rebalance.max.delay.ms</code>
      */
     public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG = "scheduled.rebalance.max.delay.ms";
     public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC = "Compatibility mode for Kafka Connect Protocol";
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index 39a7068..a32ffdc 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -65,6 +65,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     protected final Set<String> candidateWorkersForReassignment;
     protected long scheduledRebalance;
     protected int delay;
+    protected int previousGenerationId;
+    protected Set<String> previousMembers;
 
     public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
         this.log = logContext.logger(IncrementalCooperativeAssignor.class);
@@ -76,6 +78,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         this.scheduledRebalance = 0;
         this.candidateWorkersForReassignment = new LinkedHashSet<>();
         this.delay = 0;
+        this.previousGenerationId = -1;
+        this.previousMembers = Collections.emptySet();
     }
 
     @Override
@@ -152,6 +156,17 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Clearing the view of previous assignments due to generation mismatch between "
+                    + "previous generation ID {} and last completed generation ID {}. This can "
+                    + "happen if the leader fails to sync the assignment within a rebalancing round. "
+                    + "The following view of previous assignments might be outdated and will be "
+                    + "ignored by the leader in the current computation of new assignments. "
+                    + "Possibly outdated previous assignments: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);
+            this.previousAssignment = ConnectorsAndTasks.EMPTY;
+        }
 
         ClusterConfigState snapshot = coordinator.configSnapshot();
         Set<String> configuredConnectors = new TreeSet<>(snapshot.connectors());
@@ -229,7 +244,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         taskAssignments =
                 completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
 
-        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
+        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs);
 
         // Do not revoke resources for re-assignment while a delayed rebalance is active
         // Also we do not revoke in two consecutive rebalances by the same leader
@@ -260,19 +275,15 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
 
         assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
         assignTasks(completeWorkerAssignment, newSubmissions.tasks());
-
         log.debug("Current complete assignments: {}", currentWorkerAssignment);
         log.debug("New complete assignments: {}", completeWorkerAssignment);
 
         Map<String, Collection<String>> currentConnectorAssignments =
                 currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
-
         Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
                 currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
-
         Map<String, Collection<String>> incrementalConnectorAssignments =
                 diff(connectorAssignments, currentConnectorAssignments);
-
         Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                 diff(taskAssignments, currentTaskAssignments);
 
@@ -285,9 +296,9 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                 fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
                                 memberConfigs.get(leaderId).url(), maxOffset, incrementalConnectorAssignments,
                                 incrementalTaskAssignments, toRevoke, delay);
-
         previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
-
+        previousGenerationId = coordinator.generationId();
+        previousMembers = memberConfigs.keySet();
         log.debug("Actual assignments: {}", assignments);
         return serializeAssignments(assignments);
     }
@@ -344,7 +355,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     // visible for testing
     protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                                          ConnectorsAndTasks newSubmissions,
-                                         List<WorkerLoad> completeWorkerAssignment) {
+                                         List<WorkerLoad> completeWorkerAssignment,
+                                         Map<String, ExtendedWorkerState> memberConfigs) {
         if (lostAssignments.isEmpty()) {
             resetDelay();
             return;
@@ -354,6 +366,16 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (scheduledRebalance <= 0 && memberConfigs.keySet().containsAll(previousMembers)) {
+            log.debug("No worker seems to have departed the group during the rebalance. The "
+                    + "missing assignments that the leader is detecting are probably due to some "
+                    + "workers failing to receive the new assignments in the previous rebalance. "
+                    + "Will reassign missing tasks as new tasks");
+            newSubmissions.connectors().addAll(lostAssignments.connectors());
+            newSubmissions.tasks().addAll(lostAssignments.tasks());
+            return;
+        }
+
         if (scheduledRebalance > 0 && now >= scheduledRebalance) {
             // delayed rebalance expired and it's time to assign resources
             log.debug("Delayed rebalance expired. Reassigning lost tasks");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index f413a6e..644f591 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.requests.OffsetCommitRequest;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
@@ -65,6 +64,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
 
     private boolean rejoinRequested;
     private volatile ConnectProtocolCompatibility currentConnectProtocol;
+    private volatile int lastCompletedGenerationId;
     private final ConnectAssignor eagerAssignor;
     private final ConnectAssignor incrementalAssignor;
     private final int coordinatorDiscoveryTimeoutMs;
@@ -111,6 +111,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
         this.eagerAssignor = new EagerAssignor(logContext);
         this.currentConnectProtocol = protocolCompatibility;
         this.coordinatorDiscoveryTimeoutMs = heartbeatIntervalMs;
+        this.lastCompletedGenerationId = Generation.NO_GENERATION.generationId;
     }
 
     @Override
@@ -216,6 +217,7 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
             log.debug("Augmented new assignment: {}", newAssignment);
         }
         assignmentSnapshot = newAssignment;
+        lastCompletedGenerationId = generation;
         listener.onAssigned(newAssignment, generation);
     }
 
@@ -261,8 +263,17 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
      * @return the generation ID or -1 if no generation is defined
      */
     public int generationId() {
-        Generation generation = super.generationIfStable();
-        return generation == null ? OffsetCommitRequest.DEFAULT_GENERATION_ID : generation.generationId;
+        return super.generation().generationId;
+    }
+
+    /**
+     * Return id that corresponds to the group generation that was active when the last join was successful
+     *
+     * @return the generation ID of the last group that was joined successfully by this member or -1 if no generation
+     * was stable at that point
+     */
+    public int lastCompletedGenerationId() {
+        return lastCompletedGenerationId;
     }
 
     private boolean isLeader() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
new file mode 100644
index 0000000..1aeaa07
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.junit.rules.TestRule;
+import org.junit.rules.TestWatcher;
+import org.junit.runner.Description;
+import org.slf4j.Logger;
+
+/**
+ */
+public class ConnectIntegrationTestUtils {
+    public static TestRule newTestWatcher(Logger log) {
+        return new TestWatcher() {
+            @Override
+            protected void starting(Description description) {
+                super.starting(description);
+                log.info("Starting test {}", description.getMethodName());
+            }
+
+            @Override
+            protected void finished(Description description) {
+                super.finished(description);
+                log.info("Finished test {}", description.getMethodName());
+            }
+        };
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 6b20ccd..7c466bc 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -23,8 +23,10 @@ import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +66,9 @@ public class ConnectWorkerIntegrationTest {
     Map<String, String> workerProps = new HashMap<>();
     Properties brokerProps = new Properties();
 
+    @Rule
+    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
+
     @Before
     public void setup() {
         // setup Connect worker properties
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 14808c7..33f65fb 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -22,8 +22,10 @@ import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,6 +66,9 @@ public class ExampleConnectIntegrationTest {
     private EmbeddedConnectCluster connect;
     private ConnectorHandle connectorHandle;
 
+    @Rule
+    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
+
     @Before
     public void setup() {
         // setup Connect worker properties
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 19e7863..be8ce61 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -22,9 +22,10 @@ import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.test.IntegrationTest;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.Ignore;
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
+import org.junit.rules.TestRule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +46,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_C
 import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
 import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -68,12 +70,16 @@ public class RebalanceSourceConnectorsIntegrationTest {
 
     private EmbeddedConnectCluster connect;
 
+    @Rule
+    public TestRule watcher = ConnectIntegrationTestUtils.newTestWatcher(log);
+
     @Before
     public void setup() {
         // setup Connect worker properties
         Map<String, String> workerProps = new HashMap<>();
         workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());
-        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000");
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(30)));
+        workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(TimeUnit.SECONDS.toMillis(30)));
 
         // setup Kafka broker properties
         Properties brokerProps = new Properties();
@@ -98,7 +104,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
         connect.stop();
     }
 
-    @Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
     @Test
     public void testStartTwoConnectors() throws Exception {
         // create test topic
@@ -133,7 +138,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
                 "Connector tasks did not start in time.");
     }
 
-    @Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
     @Test
     public void testReconfigConnector() throws Exception {
         ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
@@ -192,7 +196,6 @@ public class RebalanceSourceConnectorsIntegrationTest {
                 recordNum >= numRecordsProduced);
     }
 
-    @Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
     @Test
     public void testDeleteConnector() throws Exception {
         // create test topic
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index b95665f..dfa5183 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -107,6 +107,7 @@ public class IncrementalCooperativeAssignorTest {
                 new LogContext(),
                 time,
                 rebalanceDelay));
+        assignor.previousGenerationId = 1000;
     }
 
     @Test
@@ -115,6 +116,7 @@ public class IncrementalCooperativeAssignorTest {
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -127,6 +129,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -138,6 +141,7 @@ public class IncrementalCooperativeAssignorTest {
         // Third assignment after revocations
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -149,6 +153,7 @@ public class IncrementalCooperativeAssignorTest {
         // A fourth rebalance should not change assignments
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -159,8 +164,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -174,6 +180,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -188,6 +195,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         assignments.remove("worker2");
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -202,6 +210,7 @@ public class IncrementalCooperativeAssignorTest {
         // been reached yet
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -215,6 +224,7 @@ public class IncrementalCooperativeAssignorTest {
         // Fourth assignment after delay expired
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -225,8 +235,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -240,6 +251,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -254,6 +266,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         assignments.remove("worker2");
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -268,6 +281,7 @@ public class IncrementalCooperativeAssignorTest {
         // been reached yet
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -283,6 +297,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -297,6 +312,7 @@ public class IncrementalCooperativeAssignorTest {
         // assignments ought to be assigned to the worker that has appeared as returned.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -307,8 +323,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -323,6 +340,7 @@ public class IncrementalCooperativeAssignorTest {
         // First assignment with 3 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
         memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -344,6 +362,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // Capture needs to be reset to point to the new assignor
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -355,6 +374,7 @@ public class IncrementalCooperativeAssignorTest {
         // Third (incidental) assignment with still only one worker in the group.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -364,8 +384,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -380,6 +401,7 @@ public class IncrementalCooperativeAssignorTest {
         // First assignment with 3 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
         memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -401,6 +423,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // Capture needs to be reset to point to the new assignor
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -415,6 +438,7 @@ public class IncrementalCooperativeAssignorTest {
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker1", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -425,6 +449,7 @@ public class IncrementalCooperativeAssignorTest {
         // Fourth assignment after revocations
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -435,8 +460,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -452,6 +478,7 @@ public class IncrementalCooperativeAssignorTest {
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
         try {
+            expectGeneration();
             assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         } catch (RuntimeException e) {
             RequestFuture.failure(e);
@@ -459,68 +486,106 @@ public class IncrementalCooperativeAssignorTest {
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        // This was the assignment that should have been sent, but didn't make it after all the way
+        // This was the assignment that should have been sent, but didn't make it all the way
         assertDelay(0, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
         // Second assignment happens with members returning the same assignments (memberConfigs)
-        // as the first time. The assignor can not tell whether it was the assignment that failed
-        // or the workers that were bounced. Therefore it goes into assignment freeze for
-        // the new assignments for a rebalance delay period
+        // as the first time. The assignor detects that the number of members did not change and
+        // avoids the rebalance delay, treating the lost assignments as new assignments.
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        assertDelay(rebalanceDelay, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
-        time.sleep(rebalanceDelay / 2);
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
+        verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
+    }
 
-        // Third (incidental) assignment with still only one worker in the group. Max delay has not
-        // been reached yet
-        applyAssignments(returnedAssignments);
-        memberConfigs = memberConfigs(leader, offset, assignments);
+    @Test
+    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
-        assertDelay(rebalanceDelay / 2, returnedAssignments);
+        assertDelay(0, returnedAssignments);
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
-        time.sleep(rebalanceDelay / 2 + 1);
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
+                .when(assignor).serializeAssignments(assignmentsCapture.capture());
 
-        // Fourth assignment after delay expired. Finally all the new assignments are assigned
+        // Second assignment triggered by a third worker joining. The computed assignment should
+        // revoke tasks from the existing group. But the assignment won't be correctly delivered.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
-        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
+        try {
+            expectGeneration();
+            assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        } catch (RuntimeException e) {
+            RequestFuture.failure(e);
+        }
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        // This was the assignment that should have been sent, but didn't make it all the way
         assertDelay(0, returnedAssignments);
+        assertNoReassignments(memberConfigs, expectedMemberConfigs);
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
+
+        // Third assignment happens with members returning the same assignments (memberConfigs)
+        // as the first time.
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertDelay(0, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
-        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
-    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() {
+    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFailsOutsideTheAssignor() {
         // Customize assignor for this test case
         time = new MockTime();
         initAssignor();
 
+        expectGeneration();
         when(coordinator.configSnapshot()).thenReturn(configState);
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -529,31 +594,30 @@ public class IncrementalCooperativeAssignorTest {
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(2, 8, 0, 0, "worker1", "worker2");
 
-        when(coordinator.configSnapshot()).thenReturn(configState);
-        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
-                .when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // Second assignment triggered by a third worker joining. The computed assignment should
-        // revoke tasks from the existing group. But the assignment won't be correctly delivered.
+        // revoke tasks from the existing group. But the assignment won't be correctly delivered
+        // and sync group with fail on the leader worker.
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
         memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
-        try {
-            assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
-        } catch (RuntimeException e) {
-            RequestFuture.failure(e);
-        }
+        when(coordinator.generationId())
+                .thenReturn(assignor.previousGenerationId + 1)
+                .thenReturn(assignor.previousGenerationId + 1);
+        when(coordinator.lastCompletedGenerationId()).thenReturn(assignor.previousGenerationId - 1);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
         expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
-        // This was the assignment that should have been sent, but didn't make it after all the way
+        // This was the assignment that should have been sent, but didn't make it all the way
         assertDelay(0, returnedAssignments);
         assertNoReassignments(memberConfigs, expectedMemberConfigs);
         assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
 
         // Third assignment happens with members returning the same assignments (memberConfigs)
         // as the first time.
+        when(coordinator.lastCompletedGenerationId()).thenReturn(assignor.previousGenerationId - 1);
         doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -564,8 +628,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -576,6 +641,7 @@ public class IncrementalCooperativeAssignorTest {
 
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
         memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -589,6 +655,7 @@ public class IncrementalCooperativeAssignorTest {
         when(coordinator.configSnapshot()).thenReturn(configState);
         applyAssignments(returnedAssignments);
         memberConfigs = memberConfigs(leader, offset, assignments);
+        expectGeneration();
         assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
         ++rebalanceNum;
         returnedAssignments = assignmentsCapture.getValue();
@@ -599,8 +666,9 @@ public class IncrementalCooperativeAssignorTest {
 
         verify(coordinator, times(rebalanceNum)).configSnapshot();
         verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(rebalanceNum)).generationId();
+        verify(coordinator, times(2 * rebalanceNum)).generationId();
         verify(coordinator, times(rebalanceNum)).memberId();
+        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
     }
 
     @Test
@@ -678,14 +746,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -693,14 +764,17 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String flakyWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
+        memberConfigs.remove(flakyWorker);
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -708,12 +782,15 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // A new worker (probably returning worker) has joined
         configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(flakyWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.singleton(flakyWorker),
@@ -721,10 +798,12 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay);
 
         // The new worker has still no assignments
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertTrue("Wrong assignment of lost connectors",
                 configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
@@ -753,14 +832,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -768,14 +850,17 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String removedWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(removedWorker, 2, 2, 4, 4);
+        memberConfigs.remove(removedWorker);
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -783,11 +868,13 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // No new worker has joined
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -797,7 +884,8 @@ public class IncrementalCooperativeAssignorTest {
 
         time.sleep(rebalanceDelay);
 
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertTrue("Wrong assignment of lost connectors",
                 newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -822,14 +910,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -837,8 +928,10 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String flakyWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
+        memberConfigs.remove(flakyWorker);
         String newWorker = "worker3";
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
@@ -846,7 +939,9 @@ public class IncrementalCooperativeAssignorTest {
 
         // Lost assignments detected - A new worker also has joined that is not the returning worker
         configuredAssignment.put(newWorker, new WorkerLoad.Builder(newWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(newWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.singleton(newWorker),
@@ -854,12 +949,15 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // Now two new workers have joined
         configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(flakyWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         Set<String> expectedWorkers = new HashSet<>();
         expectedWorkers.addAll(Arrays.asList(newWorker, flakyWorker));
@@ -869,12 +967,16 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay);
 
         // The new workers have new assignments, other than the lost ones
         configuredAssignment.put(flakyWorker, workerLoad(flakyWorker, 6, 2, 8, 4));
         configuredAssignment.put(newWorker, workerLoad(newWorker, 8, 2, 12, 4));
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        // we don't reflect these new assignments in memberConfigs currently because they are not
+        // used in handleLostAssignments method
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         // newWorker joined first, so should be picked up first as a candidate for reassignment
         assertTrue("Wrong assignment of lost connectors",
@@ -904,14 +1006,17 @@ public class IncrementalCooperativeAssignorTest {
 
         Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+        memberConfigs = memberConfigs(leader, offset, 0, 2);
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()));
+                new ArrayList<>(configuredAssignment.values()),
+                memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -919,14 +1024,17 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         String veryFlakyWorker = "worker1";
         WorkerLoad lostLoad = workerLoad(veryFlakyWorker, 2, 2, 4, 4);
+        memberConfigs.remove(veryFlakyWorker);
 
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -934,12 +1042,15 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // A new worker (probably returning worker) has joined
         configuredAssignment.put(veryFlakyWorker, new WorkerLoad.Builder(veryFlakyWorker).build());
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.put(veryFlakyWorker, new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertThat("Wrong set of workers for reassignments",
                 Collections.singleton(veryFlakyWorker),
@@ -947,11 +1058,14 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
+        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
         time.sleep(rebalanceDelay);
 
         // The returning worker leaves permanently after joining briefly during the delay
         configuredAssignment.remove(veryFlakyWorker);
-        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+        memberConfigs.remove(veryFlakyWorker);
+        assignor.handleLostAssignments(lostAssignments, newSubmissions,
+                new ArrayList<>(configuredAssignment.values()), memberConfigs);
 
         assertTrue("Wrong assignment of lost connectors",
                 newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -1173,4 +1287,11 @@ public class IncrementalCooperativeAssignorTest {
                 Collections.emptyList(),
                 is(existingTasks));
     }
+
+    private void expectGeneration() {
+        when(coordinator.generationId())
+                .thenReturn(assignor.previousGenerationId + 1)
+                .thenReturn(assignor.previousGenerationId + 1);
+        when(coordinator.lastCompletedGenerationId()).thenReturn(assignor.previousGenerationId);
+    }
 }