You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/05/09 13:44:09 UTC

[kafka] branch trunk updated: KAFKA-13763: Refactor IncrementalCooperativeAssignor for improved unit testing (#11983)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1278e385c0 KAFKA-13763: Refactor IncrementalCooperativeAssignor for improved unit testing (#11983)
1278e385c0 is described below

commit 1278e385c02a44861e9d00b32c0f476d4b149be5
Author: Chris Egerton <fe...@gmail.com>
AuthorDate: Mon May 9 09:43:47 2022 -0400

    KAFKA-13763: Refactor IncrementalCooperativeAssignor for improved unit testing (#11983)
    
    The goals here include:
    
    1. Create an overloaded variant of the IncrementalCooperativeAssignor::performTaskAssignment method that is more testing friendly
    2. Simplify the parameter list for the IncrementalCooperativeAssignor::handleLostAssignments method, which in turn simplifies the logic for testing this class
    3. Capture repeated Java 8 streams logic in simple, reusable, easily-verifiable utility methods added to the ConnectUtils class
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../runtime/distributed/ConnectAssignor.java       |   2 +-
 .../IncrementalCooperativeAssignor.java            | 280 ++++++++---
 .../IncrementalCooperativeConnectProtocol.java     |   9 +-
 .../runtime/distributed/WorkerCoordinator.java     |  23 +
 .../apache/kafka/connect/util/ConnectUtils.java    |  31 ++
 .../ConnectProtocolCompatibilityTest.java          |  91 ++--
 .../IncrementalCooperativeAssignorTest.java        | 528 +++++++++------------
 .../WorkerCoordinatorIncrementalTest.java          |   4 +-
 8 files changed, 515 insertions(+), 453 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
index 752e62e680..1436460d1a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
@@ -32,7 +32,7 @@ public interface ConnectAssignor {
      * method computes an assignment of connectors and tasks among the members of the worker group.
      *
      * @param leaderId the leader of the group
-     * @param protocol the protocol type; for Connect assignors this is normally "connect"
+     * @param protocol the protocol type; for Connect assignors this is "eager", "compatible", or "sessioned"
      * @param allMemberMetadata the metadata of all the active workers of the group
      * @param coordinator the worker coordinator that runs this assignor
      * @return the assignment of connectors and tasks to workers
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
index e6a8b302b4..8ed80903c5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -16,11 +16,13 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
+import java.util.Arrays;
 import java.util.Map.Entry;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
 import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 
@@ -43,9 +45,10 @@ import java.util.stream.IntStream;
 
 import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
-import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
+import static org.apache.kafka.connect.util.ConnectUtils.combineCollections;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
 
 /**
  * An assignor that computes a distribution of connectors and tasks according to the incremental
@@ -104,18 +107,15 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
                   maxOffset, coordinator.configSnapshot().offset());
 
-        short protocolVersion = memberConfigs.values().stream()
-            .allMatch(state -> state.assignment().version() == CONNECT_PROTOCOL_V2)
-                ? CONNECT_PROTOCOL_V2
-                : CONNECT_PROTOCOL_V1;
+        short protocolVersion = ConnectProtocolCompatibility.fromProtocol(protocol).protocolVersion();
 
         Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
         if (leaderOffset == null) {
             Map<String, ExtendedAssignment> assignments = fillAssignments(
                     memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
-                    leaderId, memberConfigs.get(leaderId).url(), maxOffset, Collections.emptyMap(),
-                    Collections.emptyMap(), Collections.emptyMap(), 0, protocolVersion);
-            return serializeAssignments(assignments);
+                    leaderId, memberConfigs.get(leaderId).url(), maxOffset,
+                    ClusterAssignment.EMPTY, 0, protocolVersion);
+            return serializeAssignments(assignments, protocolVersion);
         }
         return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator, protocolVersion);
     }
@@ -159,11 +159,41 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                                                             WorkerCoordinator coordinator, short protocolVersion) {
         log.debug("Performing task assignment during generation: {} with memberId: {}",
                 coordinator.generationId(), coordinator.memberId());
+        Map<String, ConnectorsAndTasks> memberAssignments = transformValues(
+                memberConfigs,
+                memberConfig -> new ConnectorsAndTasks.Builder()
+                        .with(memberConfig.assignment().connectors(), memberConfig.assignment().tasks())
+                        .build()
+        );
+        ClusterAssignment clusterAssignment = performTaskAssignment(
+                coordinator.configSnapshot(),
+                coordinator.lastCompletedGenerationId(),
+                coordinator.generationId(),
+                memberAssignments
+        );
+
+        coordinator.leaderState(new LeaderState(memberConfigs, clusterAssignment.allAssignedConnectors(), clusterAssignment.allAssignedTasks()));
 
+        Map<String, ExtendedAssignment> assignments =
+                fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
+                        memberConfigs.get(leaderId).url(), maxOffset,
+                        clusterAssignment,
+                        delay, protocolVersion);
+
+        log.debug("Actual assignments: {}", assignments);
+        return serializeAssignments(assignments, protocolVersion);
+    }
+
+    // Visible for testing
+    ClusterAssignment performTaskAssignment(
+            ClusterConfigState configSnapshot,
+            int lastCompletedGenerationId,
+            int currentGenerationId,
+            Map<String, ConnectorsAndTasks> memberAssignments
+    ) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
-        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
         if (previousGenerationId != lastCompletedGenerationId) {
             log.debug("Clearing the view of previous assignments due to generation mismatch between "
                     + "previous generation ID {} and last completed generation ID {}. This can "
@@ -175,11 +205,8 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
             this.previousAssignment = ConnectorsAndTasks.EMPTY;
         }
 
-        ClusterConfigState snapshot = coordinator.configSnapshot();
-        Set<String> configuredConnectors = new TreeSet<>(snapshot.connectors());
-        Set<ConnectorTaskId> configuredTasks = configuredConnectors.stream()
-                .flatMap(c -> snapshot.tasks(c).stream())
-                .collect(Collectors.toSet());
+        Set<String> configuredConnectors = new TreeSet<>(configSnapshot.connectors());
+        Set<ConnectorTaskId> configuredTasks = combineCollections(configuredConnectors, configSnapshot::tasks, Collectors.toSet());
 
         // Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
         // be used to calculate derived sets
@@ -189,7 +216,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
 
         // Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
         // used to calculate derived sets
-        ConnectorsAndTasks activeAssignments = assignment(memberConfigs);
+        ConnectorsAndTasks activeAssignments = assignment(memberAssignments);
         log.debug("Active assignments: {}", activeAssignments);
 
         // This means that a previous revocation did not take effect. In this case, reset
@@ -225,7 +252,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         log.debug("New assignments: {}", newSubmissions);
 
         // A collection of the complete assignment
-        List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
+        List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberAssignments, ConnectorsAndTasks.EMPTY);
         log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment);
 
         // Per worker connector assignments without removing deleted connectors yet
@@ -239,23 +266,23 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments);
 
         // A collection of the current assignment excluding the connectors-and-tasks to be deleted
-        List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberConfigs, deleted);
+        List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberAssignments, deleted);
 
         Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
         log.debug("Connector and task to delete assignments: {}", toRevoke);
 
         // Revoking redundant connectors/tasks if the workers have duplicate assignments
-        toRevoke.putAll(computeDuplicatedAssignments(memberConfigs, connectorAssignments, taskAssignments));
+        toRevoke.putAll(computeDuplicatedAssignments(memberAssignments, connectorAssignments, taskAssignments));
         log.debug("Connector and task to revoke assignments (include duplicated assignments): {}", toRevoke);
 
         // Recompute the complete assignment excluding the deleted connectors-and-tasks
-        completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
+        completeWorkerAssignment = workerAssignment(memberAssignments, deleted);
         connectorAssignments =
                 completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
         taskAssignments =
                 completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
 
-        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment, memberConfigs);
+        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
 
         // Do not revoke resources for re-assignment while a delayed rebalance is active
         // Also we do not revoke in two consecutive rebalances by the same leader
@@ -298,20 +325,24 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
                 diff(taskAssignments, currentTaskAssignments);
 
+        previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
+        previousGenerationId = currentGenerationId;
+        previousMembers = memberAssignments.keySet();
+
         log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
         log.debug("Incremental task assignments: {}", incrementalTaskAssignments);
 
-        coordinator.leaderState(new LeaderState(memberConfigs, connectorAssignments, taskAssignments));
-
-        Map<String, ExtendedAssignment> assignments =
-                fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
-                                memberConfigs.get(leaderId).url(), maxOffset, incrementalConnectorAssignments,
-                                incrementalTaskAssignments, toRevoke, delay, protocolVersion);
-        previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
-        previousGenerationId = coordinator.generationId();
-        previousMembers = memberConfigs.keySet();
-        log.debug("Actual assignments: {}", assignments);
-        return serializeAssignments(assignments);
+        Map<String, Collection<String>> revokedConnectors = transformValues(toRevoke, ConnectorsAndTasks::connectors);
+        Map<String, Collection<ConnectorTaskId>> revokedTasks = transformValues(toRevoke, ConnectorsAndTasks::tasks);
+
+        return new ClusterAssignment(
+                incrementalConnectorAssignments,
+                incrementalTaskAssignments,
+                revokedConnectors,
+                revokedTasks,
+                diff(connectorAssignments, revokedConnectors),
+                diff(taskAssignments, revokedTasks)
+        );
     }
 
     private Map<String, ConnectorsAndTasks> computeDeleted(ConnectorsAndTasks deleted,
@@ -344,9 +375,9 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                                                          Map<String, Collection<ConnectorTaskId>> taskAssignments,
                                                          ConnectorsAndTasks lostAssignments) {
         ConnectorsAndTasks previousAssignment = new ConnectorsAndTasks.Builder().with(
-                connectorAssignments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
-                taskAssignments.values() .stream() .flatMap(Collection::stream).collect(Collectors.toSet()))
-                .build();
+                ConnectUtils.combineCollections(connectorAssignments.values()),
+                ConnectUtils.combineCollections(taskAssignments.values())
+        ).build();
 
         for (ConnectorsAndTasks revoked : toRevoke.values()) {
             previousAssignment.connectors().removeAll(revoked.connectors());
@@ -363,29 +394,36 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         return previousAssignment;
     }
 
-    private ConnectorsAndTasks duplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs) {
-        Set<String> connectors = memberConfigs.entrySet().stream()
-                .flatMap(memberConfig -> memberConfig.getValue().assignment().connectors().stream())
-                .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
+    private ConnectorsAndTasks duplicatedAssignments(Map<String, ConnectorsAndTasks> memberAssignments) {
+        Map<String, Long> connectorInstanceCounts = combineCollections(
+                memberAssignments.values(),
+                ConnectorsAndTasks::connectors,
+                Collectors.groupingBy(Function.identity(), Collectors.counting())
+        );
+        Set<String> duplicatedConnectors = connectorInstanceCounts
                 .entrySet().stream()
                 .filter(entry -> entry.getValue() > 1L)
                 .map(Entry::getKey)
                 .collect(Collectors.toSet());
 
-        Set<ConnectorTaskId> tasks = memberConfigs.values().stream()
-                .flatMap(state -> state.assignment().tasks().stream())
-                .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()))
+        Map<ConnectorTaskId, Long> taskInstanceCounts = combineCollections(
+                memberAssignments.values(),
+                ConnectorsAndTasks::tasks,
+                Collectors.groupingBy(Function.identity(), Collectors.counting())
+        );
+        Set<ConnectorTaskId> duplicatedTasks = taskInstanceCounts
                 .entrySet().stream()
                 .filter(entry -> entry.getValue() > 1L)
                 .map(Entry::getKey)
                 .collect(Collectors.toSet());
-        return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
+
+        return new ConnectorsAndTasks.Builder().with(duplicatedConnectors, duplicatedTasks).build();
     }
 
-    private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ExtendedWorkerState> memberConfigs,
+    private Map<String, ConnectorsAndTasks> computeDuplicatedAssignments(Map<String, ConnectorsAndTasks> memberAssignments,
                                              Map<String, Collection<String>> connectorAssignments,
                                              Map<String, Collection<ConnectorTaskId>> taskAssignment) {
-        ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberConfigs);
+        ConnectorsAndTasks duplicatedAssignments = duplicatedAssignments(memberAssignments);
         log.debug("Duplicated assignments: {}", duplicatedAssignments);
 
         Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
@@ -421,8 +459,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     // visible for testing
     protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
                                          ConnectorsAndTasks newSubmissions,
-                                         List<WorkerLoad> completeWorkerAssignment,
-                                         Map<String, ExtendedWorkerState> memberConfigs) {
+                                         List<WorkerLoad> completeWorkerAssignment) {
         if (lostAssignments.isEmpty()) {
             resetDelay();
             return;
@@ -432,7 +469,10 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
-        if (scheduledRebalance <= 0 && memberConfigs.keySet().containsAll(previousMembers)) {
+        Set<String> activeMembers = completeWorkerAssignment.stream()
+                .map(WorkerLoad::worker)
+                .collect(Collectors.toSet());
+        if (scheduledRebalance <= 0 && activeMembers.containsAll(previousMembers)) {
             log.debug("No worker seems to have departed the group during the rebalance. The "
                     + "missing assignments that the leader is detecting are probably due to some "
                     + "workers failing to receive the new assignments in the previous rebalance. "
@@ -489,7 +529,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
                 log.debug("Delayed rebalance in progress. Task reassignment is postponed. New computed rebalance delay: {}", delay);
             } else {
                 // This means scheduledRebalance == 0
-                // We could also also extract the current minimum delay from the group, to make
+                // We could also extract the current minimum delay from the group, to make
                 // independent of consecutive leader failures, but this optimization is skipped
                 // at the moment
                 delay = maxDelay;
@@ -526,7 +566,7 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
     }
 
     /**
-     * Task revocation is based on an rough estimation of the lower average number of tasks before
+     * Task revocation is based on a rough estimation of the lower average number of tasks before
      * and after new workers join the group. If no new workers join, no revocation takes place.
      * Based on this estimation, tasks are revoked until the new floor average is reached for
      * each existing worker. The revoked tasks, once assigned to the new workers will maintain
@@ -610,16 +650,14 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
 
     private Map<String, ExtendedAssignment> fillAssignments(Collection<String> members, short error,
                                                             String leaderId, String leaderUrl, long maxOffset,
-                                                            Map<String, Collection<String>> connectorAssignments,
-                                                            Map<String, Collection<ConnectorTaskId>> taskAssignments,
-                                                            Map<String, ConnectorsAndTasks> revoked,
+                                                            ClusterAssignment clusterAssignment,
                                                             int delay, short protocolVersion) {
         Map<String, ExtendedAssignment> groupAssignment = new HashMap<>();
         for (String member : members) {
-            Collection<String> connectorsToStart = connectorAssignments.getOrDefault(member, Collections.emptyList());
-            Collection<ConnectorTaskId> tasksToStart = taskAssignments.getOrDefault(member, Collections.emptyList());
-            Collection<String> connectorsToStop = revoked.getOrDefault(member, ConnectorsAndTasks.EMPTY).connectors();
-            Collection<ConnectorTaskId> tasksToStop = revoked.getOrDefault(member, ConnectorsAndTasks.EMPTY).tasks();
+            Collection<String> connectorsToStart = clusterAssignment.newlyAssignedConnectors(member);
+            Collection<ConnectorTaskId> tasksToStart = clusterAssignment.newlyAssignedTasks(member);
+            Collection<String> connectorsToStop = clusterAssignment.newlyRevokedConnectors(member);
+            Collection<ConnectorTaskId> tasksToStop = clusterAssignment.newlyRevokedTasks(member);
             ExtendedAssignment assignment =
                     new ExtendedAssignment(protocolVersion, error, leaderId, leaderUrl, maxOffset,
                             connectorsToStart, tasksToStart, connectorsToStop, tasksToStop, delay);
@@ -637,12 +675,13 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
      * @param assignments the map of worker assignments
      * @return the serialized map of assignments to workers
      */
-    protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments) {
+    protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments, short protocolVersion) {
+        boolean sessioned = protocolVersion >= CONNECT_PROTOCOL_V2;
         return assignments.entrySet()
                 .stream()
                 .collect(Collectors.toMap(
                     Map.Entry::getKey,
-                    e -> IncrementalCooperativeConnectProtocol.serializeAssignment(e.getValue())));
+                    e -> IncrementalCooperativeConnectProtocol.serializeAssignment(e.getValue(), sessioned)));
     }
 
     private static ConnectorsAndTasks diff(ConnectorsAndTasks base,
@@ -661,23 +700,18 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         Map<String, Collection<T>> incremental = new HashMap<>();
         for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
             List<T> values = new ArrayList<>(entry.getValue());
-            values.removeAll(toSubtract.get(entry.getKey()));
+            values.removeAll(toSubtract.getOrDefault(entry.getKey(), Collections.emptySet()));
             incremental.put(entry.getKey(), values);
         }
         return incremental;
     }
 
-    private ConnectorsAndTasks assignment(Map<String, ExtendedWorkerState> memberConfigs) {
-        log.debug("Received assignments: {}", memberConfigs);
-        Set<String> connectors = memberConfigs.values()
-                .stream()
-                .flatMap(state -> state.assignment().connectors().stream())
-                .collect(Collectors.toSet());
-        Set<ConnectorTaskId> tasks = memberConfigs.values()
-                .stream()
-                .flatMap(state -> state.assignment().tasks().stream())
-                .collect(Collectors.toSet());
-        return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
+    private ConnectorsAndTasks assignment(Map<String, ConnectorsAndTasks> memberAssignments) {
+        log.debug("Received assignments: {}", memberAssignments);
+        return new ConnectorsAndTasks.Builder().with(
+                ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::connectors),
+                ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::tasks)
+        ).build();
     }
 
     private int calculateDelay(long now) {
@@ -745,22 +779,120 @@ public class IncrementalCooperativeAssignor implements ConnectAssignor {
         }
     }
 
-    private static List<WorkerLoad> workerAssignment(Map<String, ExtendedWorkerState> memberConfigs,
+    private static List<WorkerLoad> workerAssignment(Map<String, ConnectorsAndTasks> memberAssignments,
                                                      ConnectorsAndTasks toExclude) {
         ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
                 .with(new HashSet<>(toExclude.connectors()), new HashSet<>(toExclude.tasks()))
                 .build();
 
-        return memberConfigs.entrySet().stream()
+        return memberAssignments.entrySet().stream()
                 .map(e -> new WorkerLoad.Builder(e.getKey()).with(
-                        e.getValue().assignment().connectors().stream()
+                        e.getValue().connectors().stream()
                                 .filter(v -> !ignore.connectors().contains(v))
                                 .collect(Collectors.toList()),
-                        e.getValue().assignment().tasks().stream()
+                        e.getValue().tasks().stream()
                                 .filter(v -> !ignore.tasks().contains(v))
                                 .collect(Collectors.toList())
                         ).build()
                 ).collect(Collectors.toList());
     }
 
+    static class ClusterAssignment {
+
+        private final Map<String, Collection<String>> newlyAssignedConnectors;
+        private final Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks;
+        private final Map<String, Collection<String>> newlyRevokedConnectors;
+        private final Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks;
+        private final Map<String, Collection<String>> allAssignedConnectors;
+        private final Map<String, Collection<ConnectorTaskId>> allAssignedTasks;
+        private final Set<String> allWorkers;
+
+        public static final ClusterAssignment EMPTY = new ClusterAssignment(
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap(),
+                Collections.emptyMap()
+        );
+
+        public ClusterAssignment(
+                Map<String, Collection<String>> newlyAssignedConnectors,
+                Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks,
+                Map<String, Collection<String>> newlyRevokedConnectors,
+                Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks,
+                Map<String, Collection<String>> allAssignedConnectors,
+                Map<String, Collection<ConnectorTaskId>> allAssignedTasks
+        ) {
+            this.newlyAssignedConnectors = newlyAssignedConnectors;
+            this.newlyAssignedTasks = newlyAssignedTasks;
+            this.newlyRevokedConnectors = newlyRevokedConnectors;
+            this.newlyRevokedTasks = newlyRevokedTasks;
+            this.allAssignedConnectors = allAssignedConnectors;
+            this.allAssignedTasks = allAssignedTasks;
+            this.allWorkers = combineCollections(
+                    Arrays.asList(newlyAssignedConnectors, newlyAssignedTasks, newlyRevokedConnectors, newlyRevokedTasks, allAssignedConnectors, allAssignedTasks),
+                    Map::keySet,
+                    Collectors.toSet()
+            );
+        }
+
+        public Map<String, Collection<String>> newlyAssignedConnectors() {
+            return newlyAssignedConnectors;
+        }
+
+        public Collection<String> newlyAssignedConnectors(String worker) {
+            return newlyAssignedConnectors.getOrDefault(worker, Collections.emptySet());
+        }
+
+        public Map<String, Collection<ConnectorTaskId>> newlyAssignedTasks() {
+            return newlyAssignedTasks;
+        }
+
+        public Collection<ConnectorTaskId> newlyAssignedTasks(String worker) {
+            return newlyAssignedTasks.getOrDefault(worker, Collections.emptySet());
+        }
+
+        public Map<String, Collection<String>> newlyRevokedConnectors() {
+            return newlyRevokedConnectors;
+        }
+
+        public Collection<String> newlyRevokedConnectors(String worker) {
+            return newlyRevokedConnectors.getOrDefault(worker, Collections.emptySet());
+        }
+
+        public Map<String, Collection<ConnectorTaskId>> newlyRevokedTasks() {
+            return newlyRevokedTasks;
+        }
+
+        public Collection<ConnectorTaskId> newlyRevokedTasks(String worker) {
+            return newlyRevokedTasks.getOrDefault(worker, Collections.emptySet());
+        }
+
+        public Map<String, Collection<String>> allAssignedConnectors() {
+            return allAssignedConnectors;
+        }
+
+        public Map<String, Collection<ConnectorTaskId>> allAssignedTasks() {
+            return allAssignedTasks;
+        }
+
+        public Set<String> allWorkers() {
+            return allWorkers;
+        }
+
+        @Override
+        public String toString() {
+            return "ClusterAssignment{"
+                    + "newlyAssignedConnectors=" + newlyAssignedConnectors
+                    + ", newlyAssignedTasks=" + newlyAssignedTasks
+                    + ", newlyRevokedConnectors=" + newlyRevokedConnectors
+                    + ", newlyRevokedTasks=" + newlyRevokedTasks
+                    + ", allAssignedConnectors=" + allAssignedConnectors
+                    + ", allAssignedTasks=" + allAssignedTasks
+                    + ", allWorkers=" + allWorkers
+                    + '}';
+        }
+    }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
index 6bcf9be65e..c32009c794 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
@@ -154,7 +154,7 @@ public class IncrementalCooperativeConnectProtocol {
                 .set(CONFIG_OFFSET_KEY_NAME, workerState.offset());
         // Not a big issue if we embed the protocol version with the assignment in the metadata
         Struct allocation = new Struct(ALLOCATION_V1)
-                .set(ALLOCATION_KEY_NAME, serializeAssignment(workerState.assignment()));
+                .set(ALLOCATION_KEY_NAME, serializeAssignment(workerState.assignment(), sessioned));
         Struct connectProtocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : CONNECT_PROTOCOL_HEADER_V1;
         ByteBuffer buffer = ByteBuffer.allocate(connectProtocolHeader.sizeOf()
                                                 + CONFIG_STATE_V1.sizeOf(configState)
@@ -230,15 +230,16 @@ public class IncrementalCooperativeConnectProtocol {
      *   ScheduledDelay     => Int32
      * </pre>
      */
-    public static ByteBuffer serializeAssignment(ExtendedAssignment assignment) {
+    public static ByteBuffer serializeAssignment(ExtendedAssignment assignment, boolean sessioned) {
         // comparison depends on reference equality for now
         if (assignment == null || ExtendedAssignment.empty().equals(assignment)) {
             return null;
         }
         Struct struct = assignment.toStruct();
-        ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf()
+        Struct protocolHeader = sessioned ? CONNECT_PROTOCOL_HEADER_V2 : CONNECT_PROTOCOL_HEADER_V1;
+        ByteBuffer buffer = ByteBuffer.allocate(protocolHeader.sizeOf()
                                                 + ASSIGNMENT_V1.sizeOf(struct));
-        CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
+        protocolHeader.writeTo(buffer);
         ASSIGNMENT_V1.write(buffer, struct);
         buffer.flip();
         return buffer;
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 65720e2a78..77fb16eaa3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -416,6 +416,29 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
             return allMembers.get(ownerId).url();
         }
 
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof LeaderState)) return false;
+            LeaderState that = (LeaderState) o;
+            return Objects.equals(allMembers, that.allMembers)
+                    && Objects.equals(connectorOwners, that.connectorOwners)
+                    && Objects.equals(taskOwners, that.taskOwners);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(allMembers, connectorOwners, taskOwners);
+        }
+
+        @Override
+        public String toString() {
+            return "LeaderState{"
+                    + "allMembers=" + allMembers
+                    + ", connectorOwners=" + connectorOwners
+                    + ", taskOwners=" + taskOwners
+                    + '}';
+        }
     }
 
     public static class ConnectorsAndTasks {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index 7adbd8f92d..bbf2477603 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -30,10 +30,15 @@ import org.apache.kafka.connect.source.SourceConnector;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Function;
+import java.util.stream.Collector;
+import java.util.stream.Collectors;
 
 public final class ConnectUtils {
     private static final Logger log = LoggerFactory.getLogger(ConnectUtils.class);
@@ -160,4 +165,30 @@ public final class ConnectUtils {
         return SourceConnector.class.isAssignableFrom(connector.getClass());
     }
 
+    public static <K, I, O> Map<K, O> transformValues(Map<K, I> map, Function<I, O> transformation) {
+        return map.entrySet().stream().collect(Collectors.toMap(
+                Map.Entry::getKey,
+                transformation.compose(Map.Entry::getValue)
+        ));
+    }
+
+    public static <I> List<I> combineCollections(Collection<Collection<I>> collections) {
+        return combineCollections(collections, Function.identity());
+    }
+
+    public static <I, T> List<T> combineCollections(Collection<I> collection, Function<I, Collection<T>> extractCollection) {
+        return combineCollections(collection, extractCollection, Collectors.toList());
+    }
+
+    public static <I, T, C> C combineCollections(
+            Collection<I> collection,
+            Function<I, Collection<T>> extractCollection,
+            Collector<T, ?, C> collector
+    ) {
+        return collection.stream()
+                .map(extractCollection)
+                .flatMap(Collection::stream)
+                .collect(collector);
+    }
+
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
index 883574957d..fdb4c542f7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
@@ -16,32 +16,22 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
-import org.apache.kafka.connect.runtime.TargetState;
-import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.mockito.Mock;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
 
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashMap;
 
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
-import static org.mockito.Mockito.when;
 
 public class ConnectProtocolCompatibilityTest {
+    private static final String LEADER = "leader";
     private static final String LEADER_URL = "leaderUrl:8083";
+    private static final long CONFIG_OFFSET = 1;
 
     private String connectorId1 = "connector1";
     private String connectorId2 = "connector2";
@@ -51,95 +41,58 @@ public class ConnectProtocolCompatibilityTest {
     private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
     private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
 
-    @Rule
-    public MockitoRule rule = MockitoJUnit.rule();
-
-    @Mock
-    private KafkaConfigBackingStore configStorage;
-    private ClusterConfigState configState;
-
-    @Before
-    public void setup() {
-        configStorage = mock(KafkaConfigBackingStore.class);
-        configState = new ClusterConfigState(
-                1L,
-                null,
-                Collections.singletonMap(connectorId1, 1),
-                Collections.singletonMap(connectorId1, new HashMap<>()),
-                Collections.singletonMap(connectorId1, TargetState.STARTED),
-                Collections.singletonMap(taskId1x0, new HashMap<>()),
-                Collections.emptySet());
-    }
-
-    @After
-    public void teardown() {
-        verifyNoMoreInteractions(configStorage);
-    }
-
     @Test
     public void testEagerToEagerMetadata() {
-        when(configStorage.snapshot()).thenReturn(configState);
-        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ConnectProtocol.WorkerState workerState = emptyWorkerState();
         ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
         ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
         assertEquals(LEADER_URL, state.url());
         assertEquals(1, state.offset());
-        verify(configStorage).snapshot();
     }
 
     @Test
     public void testCoopToCoopMetadata() {
-        when(configStorage.snapshot()).thenReturn(configState);
-        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V1);
         ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, false);
         ExtendedWorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
         assertEquals(LEADER_URL, state.url());
         assertEquals(1, state.offset());
-        verify(configStorage).snapshot();
     }
 
     @Test
     public void testSessionedToCoopMetadata() {
-        when(configStorage.snapshot()).thenReturn(configState);
-        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V2);
         ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, true);
         ExtendedWorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
         assertEquals(LEADER_URL, state.url());
         assertEquals(1, state.offset());
-        verify(configStorage).snapshot();
     }
 
     @Test
     public void testSessionedToEagerMetadata() {
-        when(configStorage.snapshot()).thenReturn(configState);
-        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V2);
         ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, true);
         ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
         assertEquals(LEADER_URL, state.url());
         assertEquals(1, state.offset());
-        verify(configStorage).snapshot();
     }
 
     @Test
     public void testCoopToEagerMetadata() {
-        when(configStorage.snapshot()).thenReturn(configState);
-        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ExtendedWorkerState workerState = emptyExtendedWorkerState(CONNECT_PROTOCOL_V1);
         ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState, false);
         ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
         assertEquals(LEADER_URL, state.url());
         assertEquals(1, state.offset());
-        verify(configStorage).snapshot();
     }
 
     @Test
     public void testEagerToCoopMetadata() {
-        when(configStorage.snapshot()).thenReturn(configState);
-        ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(LEADER_URL, configStorage.snapshot().offset());
+        ConnectProtocol.WorkerState workerState = emptyWorkerState();
         ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
         ConnectProtocol.WorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
         assertEquals(LEADER_URL, state.url());
         assertEquals(1, state.offset());
-        verify(configStorage).snapshot();
     }
 
     @Test
@@ -176,7 +129,7 @@ public class ConnectProtocolCompatibilityTest {
                 Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0),
                 Collections.emptyList(), Collections.emptyList(), 0);
 
-        ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
         assertFalse(leaderAssignment.failed());
         assertEquals("leader", leaderAssignment.leader());
@@ -235,7 +188,7 @@ public class ConnectProtocolCompatibilityTest {
                 Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0),
                 Collections.emptyList(), Collections.emptyList(), 0);
 
-        ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
         assertFalse(leaderAssignment.failed());
         assertEquals("leader", leaderAssignment.leader());
@@ -248,7 +201,7 @@ public class ConnectProtocolCompatibilityTest {
                 Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0),
                 Collections.emptyList(), Collections.emptyList(), 0);
 
-        ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2);
+        ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2, false);
         ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(memberBuf);
         assertFalse(memberAssignment.failed());
         assertEquals("member", memberAssignment.leader());
@@ -257,4 +210,24 @@ public class ConnectProtocolCompatibilityTest {
         assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
     }
 
+    private ConnectProtocol.WorkerState emptyWorkerState() {
+        return new ConnectProtocol.WorkerState(LEADER_URL, CONFIG_OFFSET);
+    }
+
+    private ExtendedWorkerState emptyExtendedWorkerState(short protocolVersion) {
+        ExtendedAssignment assignment = new ExtendedAssignment(
+                protocolVersion,
+                ConnectProtocol.Assignment.NO_ERROR,
+                LEADER,
+                LEADER_URL,
+                CONFIG_OFFSET,
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                0
+        );
+        return new ExtendedWorkerState(LEADER_URL, CONFIG_OFFSET, assignment);
+    }
+
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
index 0ad7f63477..bd51456b72 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -17,25 +17,18 @@
 package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
+import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.junit.After;
 import org.junit.Before;
-import org.junit.Rule;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.mockito.ArgumentCaptor;
-import org.mockito.Captor;
-import org.mockito.Mock;
-import org.mockito.Mockito;
-import org.mockito.junit.MockitoJUnit;
-import org.mockito.junit.MockitoRule;
 
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -44,93 +37,57 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Set;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.Stream;
 
-import static org.apache.kafka.connect.runtime.distributed.ExtendedAssignment.duplicate;
-import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
-import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeAssignor.ClusterAssignment;
 import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.apache.kafka.connect.util.ConnectUtils.transformValues;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
-import static org.junit.runners.Parameterized.Parameter;
-import static org.junit.runners.Parameterized.Parameters;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.times;
+import static org.mockito.ArgumentMatchers.notNull;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.verifyNoMoreInteractions;
 import static org.mockito.Mockito.when;
 
-@RunWith(Parameterized.class)
 public class IncrementalCooperativeAssignorTest {
-    @Rule
-    public MockitoRule rule = MockitoJUnit.rule();
 
-    @Mock
-    private WorkerCoordinator coordinator;
-
-    @Captor
-    ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
-
-    @Parameters
-    public static Iterable<?> mode() {
-        return Arrays.asList(CONNECT_PROTOCOL_V1, CONNECT_PROTOCOL_V2);
-    }
-
-    @Parameter
-    public short protocolVersion;
+    // Offset isn't used in most tests but is required for creating a config snapshot object,
+    // so just use some arbitrary constant for that
+    private static final long CONFIG_OFFSET = 618;
 
     private Map<String, Integer> connectors;
-    private Map<String, ExtendedWorkerState> memberConfigs;
-    private long offset;
-    private String leader;
-    private String leaderUrl;
     private Time time;
     private int rebalanceDelay;
     private IncrementalCooperativeAssignor assignor;
-    private int rebalanceNum;
-    Map<String, ExtendedAssignment> returnedAssignments;
+    private int generationId;
+    private ClusterAssignment returnedAssignments;
+    private Map<String, ConnectorsAndTasks> memberAssignments;
 
     @Before
     public void setup() {
-        leader = "worker1";
-        leaderUrl = expectedLeaderUrl(leader);
-        offset = 10;
+        generationId = 1000;
+        time = Time.SYSTEM;
+        rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
         connectors = new HashMap<>();
         addNewConnector("connector1", 4);
         addNewConnector("connector2", 4);
-        memberConfigs = memberConfigs(leader, offset, "worker1");
-        time = Time.SYSTEM;
-        rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
+        memberAssignments = new HashMap<>();
+        addNewEmptyWorkers("worker1");
         initAssignor();
     }
 
-    @After
-    public void teardown() {
-        verifyNoMoreInteractions(coordinator);
-    }
-
     public void initAssignor() {
-        assignor = Mockito.spy(new IncrementalCooperativeAssignor(
-                new LogContext(),
-                time,
-                rebalanceDelay));
-        assignor.previousGenerationId = 1000;
+        assignor = new IncrementalCooperativeAssignor(new LogContext(), time, rebalanceDelay);
+        assignor.previousGenerationId = generationId;
     }
 
     @Test
     public void testTaskAssignmentWhenWorkerJoins() {
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
         performStandardRebalance();
         assertDelay(0);
@@ -158,8 +115,6 @@ public class IncrementalCooperativeAssignorTest {
         performStandardRebalance();
         assertDelay(0);
         assertEmptyAssignment();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -168,9 +123,6 @@ public class IncrementalCooperativeAssignorTest {
         time = new MockTime();
         initAssignor();
 
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2");
         performStandardRebalance();
@@ -205,8 +157,6 @@ public class IncrementalCooperativeAssignorTest {
         assertConnectorAllocations(2);
         assertTaskAllocations(8);
         assertBalancedAndCompleteAllocation();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -215,9 +165,6 @@ public class IncrementalCooperativeAssignorTest {
         time = new MockTime();
         initAssignor();
 
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2");
         performStandardRebalance();
@@ -263,8 +210,6 @@ public class IncrementalCooperativeAssignorTest {
         assertConnectorAllocations(1, 1);
         assertTaskAllocations(4, 4);
         assertBalancedAndCompleteAllocation();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -273,9 +218,6 @@ public class IncrementalCooperativeAssignorTest {
         time = new MockTime();
         initAssignor();
 
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 3 workers and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2", "worker3");
         performStandardRebalance();
@@ -289,13 +231,10 @@ public class IncrementalCooperativeAssignorTest {
         // group was the leader. The new leader has no previous assignments and is not tracking a
         // delay upon a leader's exit
         removeWorkers("worker1");
-        leader = "worker2";
-        leaderUrl = expectedLeaderUrl(leader);
         // The fact that the leader bounces means that the assignor starts from a clean slate
         initAssignor();
 
         // Capture needs to be reset to point to the new assignor
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
         performStandardRebalance();
         assertDelay(0);
         assertWorkers("worker2", "worker3");
@@ -307,8 +246,6 @@ public class IncrementalCooperativeAssignorTest {
         performStandardRebalance();
         assertDelay(0);
         assertEmptyAssignment();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -317,9 +254,6 @@ public class IncrementalCooperativeAssignorTest {
         time = new MockTime();
         initAssignor();
 
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 3 workers and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2", "worker3");
         performStandardRebalance();
@@ -333,13 +267,10 @@ public class IncrementalCooperativeAssignorTest {
         // group was the leader. The new leader has no previous assignments and is not tracking a
         // delay upon a leader's exit
         removeWorkers("worker1");
-        leader = "worker2";
-        leaderUrl = expectedLeaderUrl(leader);
         // The fact that the leader bounces means that the assignor starts from a clean slate
         initAssignor();
 
         // Capture needs to be reset to point to the new assignor
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
         performStandardRebalance();
         assertDelay(0);
         assertWorkers("worker2", "worker3");
@@ -363,8 +294,6 @@ public class IncrementalCooperativeAssignorTest {
         assertConnectorAllocations(0, 1, 1);
         assertTaskAllocations(2, 3, 3);
         assertBalancedAndCompleteAllocation();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -373,10 +302,6 @@ public class IncrementalCooperativeAssignorTest {
         time = new MockTime();
         initAssignor();
 
-        updateConfigSnapshot();
-        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
-                .when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2");
         performFailedRebalance();
@@ -389,14 +314,11 @@ public class IncrementalCooperativeAssignorTest {
         // Second assignment happens with members returning the same assignments (memberConfigs)
         // as the first time. The assignor detects that the number of members did not change and
         // avoids the rebalance delay, treating the lost assignments as new assignments.
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
         performStandardRebalance();
         assertDelay(0);
         assertConnectorAllocations(1, 1);
         assertTaskAllocations(4, 4);
         assertBalancedAndCompleteAllocation();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -405,9 +327,6 @@ public class IncrementalCooperativeAssignorTest {
         time = new MockTime();
         initAssignor();
 
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2");
         performStandardRebalance();
@@ -417,10 +336,6 @@ public class IncrementalCooperativeAssignorTest {
         assertTaskAllocations(4, 4);
         assertBalancedAndCompleteAllocation();
 
-        updateConfigSnapshot();
-        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
-                .when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // Second assignment triggered by a third worker joining. The computed assignment should
         // revoke tasks from the existing group. But the assignment won't be correctly delivered.
         addNewEmptyWorkers("worker3");
@@ -433,7 +348,6 @@ public class IncrementalCooperativeAssignorTest {
 
         // Third assignment happens with members returning the same assignments (memberConfigs)
         // as the first time.
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
         performStandardRebalance();
         assertDelay(0);
         assertConnectorAllocations(0, 1, 1);
@@ -445,8 +359,6 @@ public class IncrementalCooperativeAssignorTest {
         assertConnectorAllocations(0, 1, 1);
         assertTaskAllocations(2, 3, 3);
         assertBalancedAndCompleteAllocation();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -455,9 +367,6 @@ public class IncrementalCooperativeAssignorTest {
         time = new MockTime();
         initAssignor();
 
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 2 workers and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2");
         performStandardRebalance();
@@ -480,7 +389,6 @@ public class IncrementalCooperativeAssignorTest {
 
         // Third assignment happens with members returning the same assignments (memberConfigs)
         // as the first time.
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
         performRebalanceWithMismatchedGeneration();
         assertDelay(0);
         assertConnectorAllocations(0, 1, 1);
@@ -492,15 +400,11 @@ public class IncrementalCooperativeAssignorTest {
         assertConnectorAllocations(0, 1, 1);
         assertTaskAllocations(2, 3, 3);
         assertBalancedAndCompleteAllocation();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
     public void testTaskAssignmentWhenConnectorsAreDeleted() {
         addNewConnector("connector3", 4);
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
 
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
         addNewEmptyWorkers("worker2");
@@ -513,15 +417,11 @@ public class IncrementalCooperativeAssignorTest {
 
         // Second assignment with an updated config state that reflects removal of a connector
         removeConnector("connector3");
-        offset++;
-        updateConfigSnapshot();
         performStandardRebalance();
         assertDelay(0);
         assertConnectorAllocations(1, 1);
         assertTaskAllocations(4, 4);
         assertBalancedAndCompleteAllocation();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
@@ -589,7 +489,6 @@ public class IncrementalCooperativeAssignorTest {
 
     @Test
     public void testLostAssignmentHandlingWhenWorkerBounces() {
-        // Customize assignor for this test case
         time = new MockTime();
         initAssignor();
 
@@ -601,15 +500,13 @@ public class IncrementalCooperativeAssignorTest {
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
         configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
-        memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()),
-                memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -617,17 +514,16 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
-        String flakyWorker = "worker1";
-        WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
-        removeWorkers(flakyWorker);
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
 
+        String flakyWorker = "worker1";
+        WorkerLoad lostLoad = configuredAssignment.remove(flakyWorker);
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -635,15 +531,14 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // A new worker (probably returning worker) has joined
         configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
-        addNewEmptyWorkers(flakyWorker);
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.singleton(flakyWorker),
@@ -651,12 +546,12 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
         time.sleep(rebalanceDelay);
 
         // The new worker has still no assignments
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertTrue("Wrong assignment of lost connectors",
                 configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
@@ -687,15 +582,13 @@ public class IncrementalCooperativeAssignorTest {
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
         configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
-        memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()),
-                memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -703,17 +596,16 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
-        String removedWorker = "worker1";
-        WorkerLoad lostLoad = workerLoad(removedWorker, 2, 2, 4, 4);
-        removeWorkers(removedWorker);
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
 
+        String removedWorker = "worker1";
+        WorkerLoad lostLoad = configuredAssignment.remove(removedWorker);
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -721,13 +613,13 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+        assignor.previousMembers = new HashSet<>(memberAssignments.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // No new worker has joined
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -738,7 +630,7 @@ public class IncrementalCooperativeAssignorTest {
         time.sleep(rebalanceDelay);
 
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertTrue("Wrong assignment of lost connectors",
                 newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -765,15 +657,13 @@ public class IncrementalCooperativeAssignorTest {
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
         configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
-        memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()),
-                memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -781,20 +671,19 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
-        String flakyWorker = "worker1";
-        WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
-        removeWorkers(flakyWorker);
-        String newWorker = "worker3";
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
 
+        String flakyWorker = "worker1";
+        WorkerLoad lostLoad = configuredAssignment.remove(flakyWorker);
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
-        // Lost assignments detected - A new worker also has joined that is not the returning worker
+        String newWorker = "worker3";
         configuredAssignment.put(newWorker, new WorkerLoad.Builder(newWorker).build());
-        addNewEmptyWorkers(newWorker);
+
+        // Lost assignments detected - A new worker also has joined that is not the returning worker
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.singleton(newWorker),
@@ -802,15 +691,14 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // Now two new workers have joined
         configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
-        addNewEmptyWorkers(flakyWorker);
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         Set<String> expectedWorkers = new HashSet<>();
         expectedWorkers.addAll(Arrays.asList(newWorker, flakyWorker));
@@ -820,7 +708,7 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
         time.sleep(rebalanceDelay);
 
         // The new workers have new assignments, other than the lost ones
@@ -829,7 +717,7 @@ public class IncrementalCooperativeAssignorTest {
         // we don't reflect these new assignments in memberConfigs currently because they are not
         // used in handleLostAssignments method
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         // both the newWorkers would need to be considered for re assignment of connectors and tasks
         List<String> listOfConnectorsInLast2Workers = new ArrayList<>();
@@ -867,15 +755,13 @@ public class IncrementalCooperativeAssignorTest {
         configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
         configuredAssignment.put("worker1", workerLoad("worker1", 2, 2, 4, 4));
         configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
-        memberConfigs = memberConfigs(leader, offset, "worker0", "worker1", "worker2");
 
         ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
 
         // No lost assignments
         assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
                 newSubmissions,
-                new ArrayList<>(configuredAssignment.values()),
-                memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -883,17 +769,16 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(0, assignor.scheduledRebalance);
         assertEquals(0, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
-        String veryFlakyWorker = "worker1";
-        WorkerLoad lostLoad = workerLoad(veryFlakyWorker, 2, 2, 4, 4);
-        removeWorkers(veryFlakyWorker);
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
 
+        String veryFlakyWorker = "worker1";
+        WorkerLoad lostLoad = configuredAssignment.remove(veryFlakyWorker);
         ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
                 .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
 
         // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.emptySet(),
@@ -901,15 +786,14 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
         time.sleep(rebalanceDelay / 2);
         rebalanceDelay /= 2;
 
         // A new worker (probably returning worker) has joined
         configuredAssignment.put(veryFlakyWorker, new WorkerLoad.Builder(veryFlakyWorker).build());
-        addNewEmptyWorkers(veryFlakyWorker);
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertEquals("Wrong set of workers for reassignments",
                 Collections.singleton(veryFlakyWorker),
@@ -917,14 +801,13 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
         assertEquals(rebalanceDelay, assignor.delay);
 
-        assignor.previousMembers = new HashSet<>(memberConfigs.keySet());
+        assignor.previousMembers = new HashSet<>(configuredAssignment.keySet());
         time.sleep(rebalanceDelay);
 
         // The returning worker leaves permanently after joining briefly during the delay
         configuredAssignment.remove(veryFlakyWorker);
-        removeWorkers(veryFlakyWorker);
         assignor.handleLostAssignments(lostAssignments, newSubmissions,
-                new ArrayList<>(configuredAssignment.values()), memberConfigs);
+                new ArrayList<>(configuredAssignment.values()));
 
         assertTrue("Wrong assignment of lost connectors",
                 newSubmissions.connectors().containsAll(lostAssignments.connectors()));
@@ -939,9 +822,6 @@ public class IncrementalCooperativeAssignorTest {
 
     @Test
     public void testTaskAssignmentWhenTasksDuplicatedInWorkerAssignment() {
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
         performStandardRebalance();
         assertDelay(0);
@@ -975,15 +855,10 @@ public class IncrementalCooperativeAssignorTest {
         performStandardRebalance();
         assertDelay(0);
         assertEmptyAssignment();
-
-        verifyCoordinatorInteractions();
     }
 
     @Test
     public void testDuplicatedAssignmentHandleWhenTheDuplicatedAssignmentsDeleted() {
-        updateConfigSnapshot();
-        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
-
         // First assignment with 1 worker and 2 connectors configured but not yet assigned
         performStandardRebalance();
         assertDelay(0);
@@ -994,7 +869,6 @@ public class IncrementalCooperativeAssignorTest {
 
         // Delete connector1
         removeConnector("connector1");
-        updateConfigSnapshot();
 
         // Second assignment with a second worker with duplicate assignment joining and the duplicated assignment is deleted at the same time
         addNewWorker("worker2", newConnectors(1, 2), newTasks("connector1", 0, 4));
@@ -1021,8 +895,107 @@ public class IncrementalCooperativeAssignorTest {
         performStandardRebalance();
         assertDelay(0);
         assertEmptyAssignment();
+    }
 
-        verifyCoordinatorInteractions();
+    @Test
+    public void testLeaderStateUpdated() {
+        // Sanity test to make sure that the coordinator's leader state is actually updated after a rebalance
+        connectors.clear();
+        String leader = "followMe";
+        Map<String, ExtendedWorkerState> workerStates = new HashMap<>();
+        workerStates.put(leader, new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, ExtendedAssignment.empty()));
+        WorkerCoordinator coordinator = mock(WorkerCoordinator.class);
+        when(coordinator.configSnapshot()).thenReturn(configState());
+        assignor.performTaskAssignment(
+                leader,
+                CONFIG_OFFSET,
+                workerStates,
+                coordinator,
+                IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2
+        );
+        verify(coordinator).leaderState(notNull());
+    }
+
+    @Test
+    public void testProtocolV1() {
+        // Sanity test to make sure that the right protocol is chosen during the assignment
+        connectors.clear();
+        String leader = "followMe";
+        List<JoinGroupResponseData.JoinGroupResponseMember> memberMetadata = new ArrayList<>();
+        ExtendedAssignment leaderAssignment = new ExtendedAssignment(
+                IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1,
+                ConnectProtocol.Assignment.NO_ERROR,
+                leader,
+                "followMe:618",
+                CONFIG_OFFSET,
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                0
+        );
+        ExtendedWorkerState leaderState = new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, leaderAssignment);
+        JoinGroupResponseData.JoinGroupResponseMember leaderMetadata = new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(leader)
+                .setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(leaderState, false).array());
+        memberMetadata.add(leaderMetadata);
+        WorkerCoordinator coordinator = mock(WorkerCoordinator.class);
+        when(coordinator.configSnapshot()).thenReturn(configState());
+        Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment(
+                leader,
+                ConnectProtocolCompatibility.COMPATIBLE.protocol(),
+                memberMetadata,
+                coordinator
+        );
+        serializedAssignments.forEach((worker, serializedAssignment) -> {
+            ExtendedAssignment assignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(serializedAssignment);
+            assertEquals(
+                    "Incorrect protocol version in assignment for worker " + worker,
+                    IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1,
+                    assignment.version()
+            );
+        });
+    }
+
+    @Test
+    public void testProtocolV2() {
+        // Sanity test to make sure that the right protocol is chosen during the assignment
+        connectors.clear();
+        String leader = "followMe";
+        List<JoinGroupResponseData.JoinGroupResponseMember> memberMetadata = new ArrayList<>();
+        ExtendedAssignment leaderAssignment = new ExtendedAssignment(
+                IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2,
+                ConnectProtocol.Assignment.NO_ERROR,
+                leader,
+                "followMe:618",
+                CONFIG_OFFSET,
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                Collections.emptySet(),
+                0
+        );
+        ExtendedWorkerState leaderState = new ExtendedWorkerState("followMe:618", CONFIG_OFFSET, leaderAssignment);
+        JoinGroupResponseData.JoinGroupResponseMember leaderMetadata = new JoinGroupResponseData.JoinGroupResponseMember()
+                .setMemberId(leader)
+                .setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(leaderState, true).array());
+        memberMetadata.add(leaderMetadata);
+        WorkerCoordinator coordinator = mock(WorkerCoordinator.class);
+        when(coordinator.configSnapshot()).thenReturn(configState());
+        Map<String, ByteBuffer> serializedAssignments = assignor.performAssignment(
+                leader,
+                ConnectProtocolCompatibility.SESSIONED.protocol(),
+                memberMetadata,
+                coordinator
+        );
+        serializedAssignments.forEach((worker, serializedAssignment) -> {
+            ExtendedAssignment assignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(serializedAssignment);
+            assertEquals(
+                    "Incorrect protocol version in assignment for worker " + worker,
+                    IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2,
+                    assignment.version()
+            );
+        });
     }
 
     private void performStandardRebalance() {
@@ -1037,23 +1010,12 @@ public class IncrementalCooperativeAssignorTest {
         performRebalance(false, true);
     }
 
-    private void performRebalance(boolean assignmentFailure, boolean expectGenerationMismatch) {
-        expectGeneration(expectGenerationMismatch);
-        // Member configs are tracked by the assignor; create a deep copy here so that modifications to our own memberConfigs field
-        // are not accidentally propagated to the one used by the assignor
-        Map<String, ExtendedWorkerState> memberConfigsCopy = memberConfigs.entrySet().stream().collect(Collectors.toMap(
-                Map.Entry::getKey,
-                e -> {
-                    ExtendedWorkerState originalWorkerState = e.getValue();
-                    return new ExtendedWorkerState(
-                            originalWorkerState.url(),
-                            originalWorkerState.offset(),
-                            duplicate(originalWorkerState.assignment())
-                    );
-                }
-        ));
+    private void performRebalance(boolean assignmentFailure, boolean generationMismatch) {
+        generationId++;
+        int lastCompletedGenerationId = generationMismatch ? generationId - 2 : generationId - 1;
         try {
-            assignor.performTaskAssignment(leader, offset, memberConfigsCopy, coordinator, protocolVersion);
+            Map<String, ConnectorsAndTasks> memberAssignmentsCopy = new HashMap<>(memberAssignments);
+            returnedAssignments = assignor.performTaskAssignment(configState(), lastCompletedGenerationId, generationId, memberAssignmentsCopy);
         } catch (RuntimeException e) {
             if (assignmentFailure) {
                 RequestFuture.failure(e);
@@ -1061,21 +1023,12 @@ public class IncrementalCooperativeAssignorTest {
                 throw e;
             }
         }
-        ++rebalanceNum;
-        returnedAssignments = assignmentsCapture.getValue();
         assertNoRedundantAssignments();
         if (!assignmentFailure) {
-            applyAssignments(leader, offset, returnedAssignments);
+            applyAssignments();
         }
     }
 
-    private void expectGeneration(boolean expectMismatch) {
-        when(coordinator.generationId())
-                .thenReturn(assignor.previousGenerationId + 1);
-        int lastCompletedGenerationId = expectMismatch ? assignor.previousGenerationId - 1 : assignor.previousGenerationId;
-        when(coordinator.lastCompletedGenerationId()).thenReturn(lastCompletedGenerationId);
-    }
-
     private void addNewEmptyWorkers(String... workers) {
         for (String worker : workers) {
             addNewWorker(worker, Collections.emptyList(), Collections.emptyList());
@@ -1083,12 +1036,10 @@ public class IncrementalCooperativeAssignorTest {
     }
 
     private void addNewWorker(String worker, List<String> connectors, List<ConnectorTaskId> tasks) {
-        ExtendedAssignment assignment = newExpandableAssignment();
-        assignment.connectors().addAll(connectors);
-        assignment.tasks().addAll(tasks);
+        ConnectorsAndTasks assignment = new ConnectorsAndTasks.Builder().withCopies(connectors, tasks).build();
         assertNull(
                 "Worker " + worker + " already exists",
-                memberConfigs.put(worker, new ExtendedWorkerState(leaderUrl, offset, assignment))
+                memberAssignments.put(worker, assignment)
         );
     }
 
@@ -1096,7 +1047,7 @@ public class IncrementalCooperativeAssignorTest {
         for (String worker : workers) {
             assertNotNull(
                     "Worker " + worker + " does not exist",
-                    memberConfigs.remove(worker)
+                    memberAssignments.remove(worker)
             );
         }
     }
@@ -1142,20 +1093,10 @@ public class IncrementalCooperativeAssignorTest {
         );
     }
 
-    private void updateConfigSnapshot() {
-        when(coordinator.configSnapshot()).thenReturn(configState());
-    }
-
     private ClusterConfigState configState() {
         Map<String, Integer> taskCounts = new HashMap<>(connectors);
-        Map<String, Map<String, String>> connectorConfigs = taskCounts.keySet().stream().collect(Collectors.toMap(
-                Function.identity(),
-                connector -> Collections.emptyMap()
-        ));
-        Map<String, TargetState> targetStates = taskCounts.keySet().stream().collect(Collectors.toMap(
-                Function.identity(),
-                connector -> TargetState.STARTED
-        ));
+        Map<String, Map<String, String>> connectorConfigs = transformValues(taskCounts, c -> Collections.emptyMap());
+        Map<String, TargetState> targetStates = transformValues(taskCounts, c -> TargetState.STARTED);
         Map<ConnectorTaskId, Map<String, String>> taskConfigs = taskCounts.entrySet().stream()
                 .flatMap(e -> IntStream.range(0, e.getValue()).mapToObj(i -> new ConnectorTaskId(e.getKey(), i)))
                 .collect(Collectors.toMap(
@@ -1163,78 +1104,59 @@ public class IncrementalCooperativeAssignorTest {
                         connectorTaskId -> Collections.emptyMap()
                 ));
         return new ClusterConfigState(
-                offset,
+                CONFIG_OFFSET,
                 null,
                 taskCounts,
                 connectorConfigs,
                 targetStates,
                 taskConfigs,
-                Collections.emptySet()
-        );
-    }
-
-    private Map<String, ExtendedWorkerState> memberConfigs(String givenLeader, long givenOffset, String... workers) {
-        return Stream.of(workers).collect(Collectors.toMap(
-                Function.identity(),
-                w -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, newExpandableAssignment(givenLeader, givenOffset))
-        ));
+                Collections.emptySet());
     }
 
-    private void applyAssignments(String leader, long offset, Map<String, ExtendedAssignment> newAssignments) {
-        newAssignments.forEach((worker, newAssignment) -> {
-            ExtendedAssignment workerAssignment = Optional.ofNullable(memberConfigs.get(worker))
-                    .map(ExtendedWorkerState::assignment)
-                    .orElseGet(this::newExpandableAssignment);
-            workerAssignment.connectors().removeAll(newAssignment.revokedConnectors());
-            workerAssignment.connectors().addAll(newAssignment.connectors());
-            workerAssignment.tasks().removeAll(newAssignment.revokedTasks());
-            workerAssignment.tasks().addAll(newAssignment.tasks());
-            memberConfigs.put(worker, new ExtendedWorkerState(expectedLeaderUrl(leader), offset, workerAssignment));
-        });
-    }
-
-    private ExtendedAssignment newExpandableAssignment() {
-        return newExpandableAssignment(leader, offset);
-    }
+    private void applyAssignments() {
+        returnedAssignments.allWorkers().forEach(worker -> {
+            ConnectorsAndTasks workerAssignment = memberAssignments.computeIfAbsent(worker, ignored -> new ConnectorsAndTasks.Builder().build());
 
-    private ExtendedAssignment newExpandableAssignment(String leader, long offset) {
-        return new ExtendedAssignment(
-                protocolVersion,
-                ConnectProtocol.Assignment.NO_ERROR,
-                leader,
-                expectedLeaderUrl(leader),
-                offset,
-                new ArrayList<>(),
-                new ArrayList<>(),
-                new ArrayList<>(),
-                new ArrayList<>(),
-                0);
-    }
+            workerAssignment.connectors().removeAll(returnedAssignments.newlyRevokedConnectors(worker));
+            workerAssignment.connectors().addAll(returnedAssignments.newlyAssignedConnectors(worker));
+            workerAssignment.tasks().removeAll(returnedAssignments.newlyRevokedTasks(worker));
+            workerAssignment.tasks().addAll(returnedAssignments.newlyAssignedTasks(worker));
 
-    private static String expectedLeaderUrl(String givenLeader) {
-        return "http://" + givenLeader + ":8083";
+            assertEquals(
+                    "Complete connector assignment for worker " + worker + " does not match expectations " +
+                            "based on prior assignment and new revocations and assignments",
+                    workerAssignment.connectors(),
+                    returnedAssignments.allAssignedConnectors().get(worker)
+            );
+            assertEquals(
+                    "Complete task assignment for worker " + worker + " does not match expectations " +
+                            "based on prior assignment and new revocations and assignments",
+                    workerAssignment.tasks(),
+                    returnedAssignments.allAssignedTasks().get(worker)
+            );
+        });
     }
 
     private void assertEmptyAssignment() {
         assertEquals(
                 "No connectors should have been newly assigned during this round",
                 Collections.emptyList(),
-                extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::connectors)
+                ConnectUtils.combineCollections(returnedAssignments.newlyAssignedConnectors().values())
         );
         assertEquals(
                 "No tasks should have been newly assigned during this round",
                 Collections.emptyList(),
-                extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::tasks)
+                ConnectUtils.combineCollections(returnedAssignments.newlyAssignedTasks().values())
         );
         assertEquals(
                 "No connectors should have been revoked during this round",
                 Collections.emptyList(),
-                extractFromAssignments(returnedAssignments, ExtendedAssignment::revokedConnectors)
+                ConnectUtils.combineCollections(returnedAssignments.newlyRevokedConnectors().values())
         );
         assertEquals(
                 "No tasks should have been revoked during this round",
                 Collections.emptyList(),
-                extractFromAssignments(returnedAssignments, ExtendedAssignment::revokedTasks)
+                ConnectUtils.combineCollections(returnedAssignments.newlyRevokedTasks().values())
         );
     }
 
@@ -1242,7 +1164,7 @@ public class IncrementalCooperativeAssignorTest {
         assertEquals(
                 "Wrong set of workers",
                 new HashSet<>(Arrays.asList(workers)),
-                returnedAssignments.keySet()
+                returnedAssignments.allWorkers()
         );
     }
 
@@ -1253,7 +1175,7 @@ public class IncrementalCooperativeAssignorTest {
      * and one worker that is assigned three connectors.
      */
     private void assertConnectorAllocations(int... connectorCounts) {
-        assertAllocations("connectors", ExtendedAssignment::connectors, connectorCounts);
+        assertAllocations("connectors", ConnectorsAndTasks::connectors, connectorCounts);
     }
 
     /**
@@ -1263,10 +1185,10 @@ public class IncrementalCooperativeAssignorTest {
      * and one worker that is assigned three tasks.
      */
     private void assertTaskAllocations(int... taskCounts) {
-        assertAllocations("tasks", ExtendedAssignment::tasks, taskCounts);
+        assertAllocations("tasks", ConnectorsAndTasks::tasks, taskCounts);
     }
 
-    private void assertAllocations(String allocated, Function<ExtendedAssignment, ? extends Collection<?>> allocation, int... rawExpectedAllocations) {
+    private void assertAllocations(String allocated, Function<ConnectorsAndTasks, ? extends Collection<?>> allocation, int... rawExpectedAllocations) {
         List<Integer> expectedAllocations = IntStream.of(rawExpectedAllocations)
                 .boxed()
                 .sorted()
@@ -1279,9 +1201,8 @@ public class IncrementalCooperativeAssignorTest {
         );
     }
 
-    private List<Integer> allocations(Function<ExtendedAssignment, ? extends Collection<?>> allocation) {
-        return memberConfigs.values().stream()
-                .map(ExtendedWorkerState::assignment)
+    private List<Integer> allocations(Function<ConnectorsAndTasks, ? extends Collection<?>> allocation) {
+        return memberAssignments.values().stream()
                 .map(allocation)
                 .map(Collection::size)
                 .sorted()
@@ -1289,8 +1210,11 @@ public class IncrementalCooperativeAssignorTest {
     }
 
     private void assertDelay(int expectedDelay) {
-        returnedAssignments.values().forEach(a -> assertEquals(
-                        "Wrong rebalance delay in " + a, expectedDelay, a.delay()));
+        assertEquals(
+                "Wrong rebalance delay",
+                expectedDelay,
+                assignor.delay
+        );
     }
 
     /**
@@ -1298,15 +1222,10 @@ public class IncrementalCooperativeAssignorTest {
      * and that each newly-assigned connector and task is only assigned to a single worker.
      */
     private void assertNoRedundantAssignments() {
-        Map<String, ExtendedAssignment> existingAssignments = memberConfigs.entrySet().stream().collect(Collectors.toMap(
-                Map.Entry::getKey,
-                e -> e.getValue().assignment()
-        ));
-
-        List<String> existingConnectors = extractFromAssignments(existingAssignments, ConnectProtocol.Assignment::connectors);
-        List<String> newConnectors = extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::connectors);
-        List<ConnectorTaskId> existingTasks = extractFromAssignments(existingAssignments, ConnectProtocol.Assignment::tasks);
-        List<ConnectorTaskId> newTasks = extractFromAssignments(returnedAssignments, ConnectProtocol.Assignment::tasks);
+        List<String> existingConnectors = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::connectors);
+        List<String> newConnectors = ConnectUtils.combineCollections(returnedAssignments.newlyAssignedConnectors().values());
+        List<ConnectorTaskId> existingTasks = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::tasks);
+        List<ConnectorTaskId> newTasks = ConnectUtils.combineCollections(returnedAssignments.newlyAssignedTasks().values());
 
         assertNoDuplicates(
                 newConnectors,
@@ -1333,8 +1252,8 @@ public class IncrementalCooperativeAssignorTest {
     }
 
     private void assertBalancedAllocation() {
-        List<Integer> connectorCounts = allocations(ExtendedAssignment::connectors);
-        List<Integer> taskCounts = allocations(ExtendedAssignment::tasks);
+        List<Integer> connectorCounts = allocations(ConnectorsAndTasks::connectors);
+        List<Integer> taskCounts = allocations(ConnectorsAndTasks::tasks);
 
         int minConnectors = connectorCounts.get(0);
         int maxConnectors = connectorCounts.get(connectorCounts.size() - 1);
@@ -1353,14 +1272,15 @@ public class IncrementalCooperativeAssignorTest {
     }
 
     private void assertCompleteAllocation() {
-        List<String> allAssignedConnectors = extractFromAssignments(memberConfigs, e -> e.assignment().connectors());
+        List<String> allAssignedConnectors = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::connectors);
         assertEquals(
                 "The set of connectors assigned across the cluster does not match the set of connectors in the config topic",
                 connectors.keySet(),
                 new HashSet<>(allAssignedConnectors)
         );
 
-        Map<String, List<ConnectorTaskId>> allAssignedTasks = extractFromAssignments(memberConfigs, e -> e.assignment().tasks()).stream()
+        Map<String, List<ConnectorTaskId>> allAssignedTasks = ConnectUtils.combineCollections(memberAssignments.values(), ConnectorsAndTasks::tasks)
+                .stream()
                 .collect(Collectors.groupingBy(ConnectorTaskId::connector, Collectors.toList()));
 
         connectors.forEach((connector, taskCount) -> {
@@ -1375,24 +1295,6 @@ public class IncrementalCooperativeAssignorTest {
         });
     }
 
-    private void verifyCoordinatorInteractions() {
-        verify(coordinator, times(rebalanceNum)).configSnapshot();
-        verify(coordinator, times(rebalanceNum)).leaderState(any());
-        verify(coordinator, times(2 * rebalanceNum)).generationId();
-        verify(coordinator, times(rebalanceNum)).memberId();
-        verify(coordinator, times(rebalanceNum)).lastCompletedGenerationId();
-    }
-
-    private static <A, T> List<T> extractFromAssignments(
-            Map<String, A> assignments,
-            Function<A, Collection<T>> extraction
-    ) {
-        return assignments.values().stream()
-                .map(extraction)
-                .flatMap(Collection::stream)
-                .collect(Collectors.toList());
-    }
-
     private static <T> void assertNoDuplicates(List<T> collection, String assertionMessage) {
         assertEquals(
                 assertionMessage,
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
index 35ba6249d7..3a15956ff1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -215,7 +215,7 @@ public class WorkerCoordinatorIncrementalTest {
                 CONNECT_PROTOCOL_V1, ExtendedAssignment.NO_ERROR, leaderId, leaderUrl, configState1.offset(),
                 Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
                 Collections.emptyList(), Collections.emptyList(), 0);
-        ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
         // Using onJoinComplete to register the protocol selection decided by the broker
         // coordinator as well as an existing previous assignment that the call to metadata will
         // include with v1 but not with v0
@@ -246,7 +246,7 @@ public class WorkerCoordinatorIncrementalTest {
                 CONNECT_PROTOCOL_V1, ExtendedAssignment.NO_ERROR, leaderId, leaderUrl, configState1.offset(),
                 Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
                 Collections.emptyList(), Collections.emptyList(), 0);
-        ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment, false);
         // Using onJoinComplete to register the protocol selection decided by the broker
         // coordinator as well as an existing previous assignment that the call to metadata will
         // include with v1 but not with v0