You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/05/17 03:46:30 UTC

[kafka] branch trunk updated: KAFKA-5505: Incremental cooperative rebalancing in Connect (KIP-415) (#6363)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ce584a0  KAFKA-5505: Incremental cooperative rebalancing in Connect (KIP-415) (#6363)
ce584a0 is described below

commit ce584a01fff4e73afa96d38c9a7508fcd67e3e46
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Thu May 16 20:46:04 2019 -0700

    KAFKA-5505: Incremental cooperative rebalancing in Connect (KIP-415) (#6363)
    
    Added the incremental cooperative rebalancing in Connect to avoid global rebalances on all connectors and tasks with each new/changed/removed connector. This new protocol is backward compatible and will work with heterogeneous clusters that exist during a rolling upgrade, but once the clusters consist of new workers only some affected connectors and tasks will be rebalanced: connectors and tasks on existing nodes still in the cluster and not added/changed/removed will continue running [...]
    
    This commit attempted to minimize the changes to the existing V0 protocol logic, though that was not entirely possible.
    
    This commit adds extensive unit and integration tests for both the old V0 protocol and the new v1 protocol. Soak testing has been performed multiple times to verify behavior while connectors and added, changed, and removed and while workers are added and removed from the cluster.
    
    Author: Konstantine Karantasis <ko...@confluent.io>
    Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <me...@ewencp.org>, Robert Yokota <ra...@gmail.com>, David Arthur <mu...@gmail.com>, Ryanne Dolan <ry...@gmail.com>
---
 build.gradle                                       |    1 +
 .../org/apache/kafka/common/config/ConfigDef.java  |   28 +
 .../runtime/distributed/ConnectAssignor.java       |   43 +
 .../runtime/distributed/ConnectProtocol.java       |  187 +++-
 .../distributed/ConnectProtocolCompatibility.java  |   90 ++
 .../runtime/distributed/DistributedConfig.java     |   41 +-
 .../runtime/distributed/DistributedHerder.java     |   95 +-
 .../connect/runtime/distributed/EagerAssignor.java |  182 ++++
 .../runtime/distributed/ExtendedAssignment.java    |  269 +++++
 .../runtime/distributed/ExtendedWorkerState.java   |   48 +
 .../IncrementalCooperativeAssignor.java            |  644 ++++++++++++
 .../IncrementalCooperativeConnectProtocol.java     |  247 +++++
 .../runtime/distributed/WorkerCoordinator.java     |  461 ++++++---
 .../runtime/distributed/WorkerGroupMember.java     |   27 +-
 .../distributed/WorkerRebalanceListener.java       |   10 +-
 .../RebalanceSourceConnectorsIntegrationTest.java  |  324 ++++++
 .../kafka/connect/runtime/WorkerTestUtils.java     |  192 ++++
 .../ConnectProtocolCompatibilityTest.java          |  236 +++++
 .../runtime/distributed/DistributedHerderTest.java |  229 ++++-
 .../IncrementalCooperativeAssignorTest.java        | 1079 ++++++++++++++++++++
 .../WorkerCoordinatorIncrementalTest.java          |  573 +++++++++++
 .../runtime/distributed/WorkerCoordinatorTest.java |   43 +-
 22 files changed, 4833 insertions(+), 216 deletions(-)

diff --git a/build.gradle b/build.gradle
index 1b11786..9787ebe 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1565,6 +1565,7 @@ project(':connect:runtime') {
     testCompile libs.junit
     testCompile libs.powermockJunit4
     testCompile libs.powermockEasymock
+    testCompile libs.mockitoCore
     testCompile libs.httpclient
 
     testCompile project(':clients').sourceSets.test.output
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index bccc6f1..362b1a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -30,6 +30,8 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Map;
 import java.util.Set;
+import java.util.function.BiConsumer;
+import java.util.function.Supplier;
 import java.util.regex.Pattern;
 
 /**
@@ -952,6 +954,32 @@ public class ConfigDef {
         }
     }
 
+    public static class LambdaValidator implements Validator {
+        BiConsumer<String, Object> ensureValid;
+        Supplier<String> toStringFunction;
+
+        private LambdaValidator(BiConsumer<String, Object> ensureValid,
+                                Supplier<String> toStringFunction) {
+            this.ensureValid = ensureValid;
+            this.toStringFunction = toStringFunction;
+        }
+
+        public static LambdaValidator with(BiConsumer<String, Object> ensureValid,
+                                           Supplier<String> toStringFunction) {
+            return new LambdaValidator(ensureValid, toStringFunction);
+        }
+
+        @Override
+        public void ensureValid(String name, Object value) {
+            ensureValid.accept(name, value);
+        }
+
+        @Override
+        public String toString() {
+            return toStringFunction.get();
+        }
+    }
+
     public static class CompositeValidator implements Validator {
         private final List<Validator> validators;
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
new file mode 100644
index 0000000..752e62e
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectAssignor.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.message.JoinGroupResponseData;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * An assignor that computes a distribution of connectors and tasks among the workers of the group
+ * that performs rebalancing.
+ */
+public interface ConnectAssignor {
+    /**
+     * Based on the member metadata and the information stored in the worker coordinator this
+     * method computes an assignment of connectors and tasks among the members of the worker group.
+     *
+     * @param leaderId the leader of the group
+     * @param protocol the protocol type; for Connect assignors this is normally "connect"
+     * @param allMemberMetadata the metadata of all the active workers of the group
+     * @param coordinator the worker coordinator that runs this assignor
+     * @return the assignment of connectors and tasks to workers
+     */
+    Map<String, ByteBuffer> performAssignment(String leaderId, String protocol,
+                                              List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata,
+                                              WorkerCoordinator coordinator);
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
index dbb33bc..15fc605 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocol.java
@@ -26,10 +26,17 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
+
+import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
+import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
 
 /**
  * This class implements the protocol for Kafka Connect workers in a group. It includes the format of worker state used when
@@ -50,19 +57,53 @@ public class ConnectProtocol {
     public static final short CONNECT_PROTOCOL_V0 = 0;
     public static final Schema CONNECT_PROTOCOL_HEADER_SCHEMA = new Schema(
             new Field(VERSION_KEY_NAME, Type.INT16));
+
+    /**
+     * Connect Protocol Header V0:
+     * <pre>
+     *   Version            => Int16
+     * </pre>
+     */
     private static final Struct CONNECT_PROTOCOL_HEADER_V0 = new Struct(CONNECT_PROTOCOL_HEADER_SCHEMA)
             .set(VERSION_KEY_NAME, CONNECT_PROTOCOL_V0);
 
+    /**
+     * Config State V0:
+     * <pre>
+     *   Url                => [String]
+     *   ConfigOffset       => Int64
+     * </pre>
+     */
     public static final Schema CONFIG_STATE_V0 = new Schema(
             new Field(URL_KEY_NAME, Type.STRING),
             new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64));
 
-    // Assignments for each worker are a set of connectors and tasks. These are categorized by connector ID. A sentinel
-    // task ID (CONNECTOR_TASK) is used to indicate the connector itself (i.e. that the assignment includes
-    // responsibility for running the Connector instance in addition to any tasks it generates).
+    /**
+     * Connector Assignment V0:
+     * <pre>
+     *   Connector          => [String]
+     *   Tasks              => [Int32]
+     * </pre>
+     *
+     * <p>Assignments for each worker are a set of connectors and tasks. These are categorized by
+     * connector ID. A sentinel task ID (CONNECTOR_TASK) is used to indicate the connector itself
+     * (i.e. that the assignment includes responsibility for running the Connector instance in
+     * addition to any tasks it generates).</p>
+     */
     public static final Schema CONNECTOR_ASSIGNMENT_V0 = new Schema(
             new Field(CONNECTOR_KEY_NAME, Type.STRING),
             new Field(TASKS_KEY_NAME, new ArrayOf(Type.INT32)));
+
+    /**
+     * Assignment V0:
+     * <pre>
+     *   Error              => Int16
+     *   Leader             => [String]
+     *   LeaderUrl          => [String]
+     *   ConfigOffset       => Int64
+     *   Assignment         => [Connector Assignment]
+     * </pre>
+     */
     public static final Schema ASSIGNMENT_V0 = new Schema(
             new Field(ERROR_KEY_NAME, Type.INT16),
             new Field(LEADER_KEY_NAME, Type.STRING),
@@ -70,6 +111,18 @@ public class ConnectProtocol {
             new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64),
             new Field(ASSIGNMENT_KEY_NAME, new ArrayOf(CONNECTOR_ASSIGNMENT_V0)));
 
+    /**
+     * The fields are serialized in sequence as follows:
+     * Subscription V0:
+     * <pre>
+     *   Version            => Int16
+     *   Url                => [String]
+     *   ConfigOffset       => Int64
+     * </pre>
+     *
+     * @param workerState the current state of the worker metadata
+     * @return the serialized state of the worker metadata
+     */
     public static ByteBuffer serializeMetadata(WorkerState workerState) {
         Struct struct = new Struct(CONFIG_STATE_V0);
         struct.set(URL_KEY_NAME, workerState.url());
@@ -81,6 +134,29 @@ public class ConnectProtocol {
         return buffer;
     }
 
+    /**
+     * Returns the collection of Connect protocols that are supported by this version along
+     * with their serialized metadata. The protocols are ordered by preference.
+     *
+     * @param workerState the current state of the worker metadata
+     * @return the collection of Connect protocol metadata
+     */
+    public static JoinGroupRequestProtocolCollection metadataRequest(WorkerState workerState) {
+        return new JoinGroupRequestProtocolCollection(Collections.singleton(
+                new JoinGroupRequestProtocol()
+                        .setName(EAGER.protocol())
+                        .setMetadata(ConnectProtocol.serializeMetadata(workerState).array()))
+                .iterator());
+    }
+
+    /**
+     * Given a byte buffer that contains protocol metadata return the deserialized form of the
+     * metadata.
+     *
+     * @param buffer A buffer containing the protocols metadata
+     * @return the deserialized metadata
+     * @throws SchemaException on incompatible Connect protocol version
+     */
     public static WorkerState deserializeMetadata(ByteBuffer buffer) {
         Struct header = CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
@@ -91,6 +167,18 @@ public class ConnectProtocol {
         return new WorkerState(url, configOffset);
     }
 
+    /**
+     * The fields are serialized in sequence as follows:
+     * Complete Assignment V0:
+     * <pre>
+     *   Version            => Int16
+     *   Error              => Int16
+     *   Leader             => [String]
+     *   LeaderUrl          => [String]
+     *   ConfigOffset       => Int64
+     *   Assignment         => [Connector Assignment]
+     * </pre>
+     */
     public static ByteBuffer serializeAssignment(Assignment assignment) {
         Struct struct = new Struct(ASSIGNMENT_V0);
         struct.set(ERROR_KEY_NAME, assignment.error());
@@ -98,10 +186,10 @@ public class ConnectProtocol {
         struct.set(LEADER_URL_KEY_NAME, assignment.leaderUrl());
         struct.set(CONFIG_OFFSET_KEY_NAME, assignment.offset());
         List<Struct> taskAssignments = new ArrayList<>();
-        for (Map.Entry<String, List<Integer>> connectorEntry : assignment.asMap().entrySet()) {
+        for (Map.Entry<String, Collection<Integer>> connectorEntry : assignment.asMap().entrySet()) {
             Struct taskAssignment = new Struct(CONNECTOR_ASSIGNMENT_V0);
             taskAssignment.set(CONNECTOR_KEY_NAME, connectorEntry.getKey());
-            List<Integer> tasks = connectorEntry.getValue();
+            Collection<Integer> tasks = connectorEntry.getValue();
             taskAssignment.set(TASKS_KEY_NAME, tasks.toArray());
             taskAssignments.add(taskAssignment);
         }
@@ -114,6 +202,14 @@ public class ConnectProtocol {
         return buffer;
     }
 
+    /**
+     * Given a byte buffer that contains an assignment as defined by this protocol, return the
+     * deserialized form of the assignment.
+     *
+     * @param buffer the buffer containing a serialized assignment
+     * @return the deserialized assignment
+     * @throws SchemaException on incompatible Connect protocol version
+     */
     public static Assignment deserializeAssignment(ByteBuffer buffer) {
         Struct header = CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
         Short version = header.getShort(VERSION_KEY_NAME);
@@ -139,6 +235,9 @@ public class ConnectProtocol {
         return new Assignment(error, leader, leaderUrl, offset, connectorIds, taskIds);
     }
 
+    /**
+     * A class that captures the deserialized form of a worker's metadata.
+     */
     public static class WorkerState {
         private final String url;
         private final long offset;
@@ -152,6 +251,11 @@ public class ConnectProtocol {
             return url;
         }
 
+        /**
+         * The most up-to-date (maximum) configuration offset according known to this worker.
+         *
+         * @return the configuration offset
+         */
         public long offset() {
             return offset;
         }
@@ -165,6 +269,9 @@ public class ConnectProtocol {
         }
     }
 
+    /**
+     * The basic assignment of connectors and tasks introduced with V0 version of the Connect protocol.
+     */
     public static class Assignment {
         public static final short NO_ERROR = 0;
         // Configuration offsets mismatched in a way that the leader could not resolve. Workers should read to the end
@@ -175,49 +282,92 @@ public class ConnectProtocol {
         private final String leader;
         private final String leaderUrl;
         private final long offset;
-        private final List<String> connectorIds;
-        private final List<ConnectorTaskId> taskIds;
+        private final Collection<String> connectorIds;
+        private final Collection<ConnectorTaskId> taskIds;
 
         /**
          * Create an assignment indicating responsibility for the given connector instances and task Ids.
-         * @param connectorIds list of connectors that the worker should instantiate and run
-         * @param taskIds list of task IDs that the worker should instantiate and run
+         *
+         * @param error error code for this assignment; {@code ConnectProtocol.Assignment.NO_ERROR}
+         *              indicates no error during assignment
+         * @param leader Connect group's leader Id; may be null only on the empty assignment
+         * @param leaderUrl Connect group's leader URL; may be null only on the empty assignment
+         * @param configOffset the most up-to-date configuration offset according to this assignment
+         * @param connectorIds list of connectors that the worker should instantiate and run; may not be null
+         * @param taskIds list of task IDs that the worker should instantiate and run; may not be null
          */
         public Assignment(short error, String leader, String leaderUrl, long configOffset,
-                          List<String> connectorIds, List<ConnectorTaskId> taskIds) {
+                          Collection<String> connectorIds, Collection<ConnectorTaskId> taskIds) {
             this.error = error;
             this.leader = leader;
             this.leaderUrl = leaderUrl;
             this.offset = configOffset;
-            this.taskIds = taskIds;
-            this.connectorIds = connectorIds;
+            this.connectorIds = Objects.requireNonNull(connectorIds,
+                    "Assigned connector IDs may be empty but not null");
+            this.taskIds = Objects.requireNonNull(taskIds,
+                    "Assigned task IDs may be empty but not null");
         }
 
+        /**
+         * Return the error code of this assignment; 0 signals successful assignment ({@code ConnectProtocol.Assignment.NO_ERROR}).
+         *
+         * @return the error code of the assignment
+         */
         public short error() {
             return error;
         }
 
+        /**
+         * Return the ID of the leader Connect worker in this assignment.
+         *
+         * @return the ID of the leader
+         */
         public String leader() {
             return leader;
         }
 
+        /**
+         * Return the URL to which the leader accepts requests from other members of the group.
+         *
+         * @return the leader URL
+         */
         public String leaderUrl() {
             return leaderUrl;
         }
 
+        /**
+         * Check if this assignment failed.
+         *
+         * @return true if this assignment failed; false otherwise
+         */
         public boolean failed() {
             return error != NO_ERROR;
         }
 
+        /**
+         * Return the most up-to-date offset in the configuration topic according to this assignment
+         *
+         * @return the configuration topic
+         */
         public long offset() {
             return offset;
         }
 
-        public List<String> connectors() {
+        /**
+         * The connectors included in this assignment.
+         *
+         * @return the connectors
+         */
+        public Collection<String> connectors() {
             return connectorIds;
         }
 
-        public List<ConnectorTaskId> tasks() {
+        /**
+         * The tasks included in this assignment.
+         *
+         * @return the tasks
+         */
+        public Collection<ConnectorTaskId> tasks() {
             return taskIds;
         }
 
@@ -233,11 +383,11 @@ public class ConnectProtocol {
                     '}';
         }
 
-        private Map<String, List<Integer>> asMap() {
+        protected Map<String, Collection<Integer>> asMap() {
             // Using LinkedHashMap preserves the ordering, which is helpful for tests and debugging
-            Map<String, List<Integer>> taskMap = new LinkedHashMap<>();
+            Map<String, Collection<Integer>> taskMap = new LinkedHashMap<>();
             for (String connectorId : new HashSet<>(connectorIds)) {
-                List<Integer> connectorTasks = taskMap.get(connectorId);
+                Collection<Integer> connectorTasks = taskMap.get(connectorId);
                 if (connectorTasks == null) {
                     connectorTasks = new ArrayList<>();
                     taskMap.put(connectorId, connectorTasks);
@@ -246,7 +396,7 @@ public class ConnectProtocol {
             }
             for (ConnectorTaskId taskId : taskIds) {
                 String connectorId = taskId.connector();
-                List<Integer> connectorTasks = taskMap.get(connectorId);
+                Collection<Integer> connectorTasks = taskMap.get(connectorId);
                 if (connectorTasks == null) {
                     connectorTasks = new ArrayList<>();
                     taskMap.put(connectorId, connectorTasks);
@@ -264,5 +414,4 @@ public class ConnectProtocol {
 
         // otherwise, assume versions can be parsed as V0
     }
-
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibility.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibility.java
new file mode 100644
index 0000000..ccded8b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibility.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import java.util.Arrays;
+import java.util.Locale;
+
+/**
+ * An enumeration of the modes available to the worker to signal which Connect protocols are
+ * enabled at any time.
+ *
+ * {@code EAGER} signifies that this worker only supports prompt release of assigned connectors
+ * and tasks in every rebalance. Corresponds to Connect protocol V0.
+ *
+ * {@code COMPATIBLE} signifies that this worker supports both eager and incremental cooperative
+ * Connect protocols and will use the version that is elected by the Kafka broker coordinator
+ * during rebalancing.
+ */
+public enum ConnectProtocolCompatibility {
+    EAGER {
+        @Override
+        public String protocol() {
+            return "default";
+        }
+    },
+
+    COMPATIBLE {
+        @Override
+        public String protocol() {
+            return "compatible";
+        }
+    };
+
+    /**
+     * Return the enum that corresponds to the name that is given as an argument;
+     * if the no mapping is found {@code IllegalArgumentException} is thrown.
+     *
+     * @param name the name of the protocol compatibility mode
+     * @return the enum that corresponds to the protocol compatibility mode
+     */
+    public static ConnectProtocolCompatibility compatibility(String name) {
+        return Arrays.stream(ConnectProtocolCompatibility.values())
+                .filter(mode -> mode.name().equalsIgnoreCase(name))
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(
+                        "Unknown Connect protocol compatibility mode: " + name));
+    }
+
+    @Override
+    public String toString() {
+        return name().toLowerCase(Locale.ROOT);
+    }
+
+    /**
+     * Return the name of the protocol that this mode will use in {@code ProtocolMetadata}.
+     *
+     * @return the protocol name
+     */
+    public abstract String protocol();
+
+    /**
+     * Return the enum that corresponds to the protocol name that is given as an argument;
+     * if the no mapping is found {@code IllegalArgumentException} is thrown.
+     *
+     * @param protocolName the name of the connect protocol
+     * @return the enum that corresponds to the protocol compatibility mode that supports the
+     * given protocol
+     */
+    public static ConnectProtocolCompatibility fromProtocol(String protocolName) {
+        return Arrays.stream(ConnectProtocolCompatibility.values())
+                .filter(mode -> mode.protocol().equalsIgnoreCase(protocolName))
+                .findFirst()
+                .orElseThrow(() -> new IllegalArgumentException(
+                        "Not found Connect protocol compatibility mode for protocol: " + protocolName));
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index af112a5..c7ff6d8 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -18,11 +18,15 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.Range.between;
 
 public class DistributedConfig extends WorkerConfig {
     private static final ConfigDef CONFIG;
@@ -132,6 +136,20 @@ public class DistributedConfig extends WorkerConfig {
     public static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG = "status.storage.replication.factor";
     private static final String STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC = "Replication factor used when creating the status storage topic";
 
+    /**
+     * <code>connect.protocol</code>
+     */
+    public static final String CONNECT_PROTOCOL_CONFIG = "connect.protocol";
+    public static final String CONNECT_PROTOCOL_DOC = "Compatibility mode for Kafka Connect Protocol";
+    public static final String CONNECT_PROTOCOL_DEFAULT = ConnectProtocolCompatibility.COMPATIBLE.toString();
+
+    /**
+     * <code>connect.protocol</code>
+     */
+    public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG = "scheduled.rebalance.max.delay.ms";
+    public static final String SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC = "Compatibility mode for Kafka Connect Protocol";
+    public static final int SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT = Math.toIntExact(TimeUnit.SECONDS.toMillis(300));
+
     static {
         CONFIG = baseConfigDef()
                 .define(GROUP_ID_CONFIG,
@@ -265,7 +283,28 @@ public class DistributedConfig extends WorkerConfig {
                         (short) 3,
                         atLeast(1),
                         ConfigDef.Importance.LOW,
-                        STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC);
+                        STATUS_STORAGE_REPLICATION_FACTOR_CONFIG_DOC)
+                .define(CONNECT_PROTOCOL_CONFIG,
+                        ConfigDef.Type.STRING,
+                        CONNECT_PROTOCOL_DEFAULT,
+                        ConfigDef.LambdaValidator.with(
+                            (name, value) -> {
+                                try {
+                                    ConnectProtocolCompatibility.compatibility((String) value);
+                                } catch (Throwable t) {
+                                    throw new ConfigException(name, value, "Invalid Connect protocol "
+                                            + "compatibility");
+                                }
+                            },
+                            () -> "[" + Utils.join(ConnectProtocolCompatibility.values(), ", ") + "]"),
+                        ConfigDef.Importance.LOW,
+                        CONNECT_PROTOCOL_DOC)
+                .define(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT,
+                        between(0, Integer.MAX_VALUE),
+                        ConfigDef.Importance.LOW,
+                        SCHEDULED_REBALANCE_MAX_DELAY_MS_DOC);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 96d4bfc..94ca734 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigValue;
 import org.apache.kafka.common.errors.WakeupException;
@@ -24,6 +25,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
@@ -54,7 +56,6 @@ import org.apache.kafka.connect.util.Callback;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.SinkUtils;
 import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -76,8 +77,11 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+
 /**
  * <p>
  *     Distributed "herder" that coordinates with other workers to spread work across multiple processes.
@@ -108,7 +112,8 @@ import java.util.concurrent.atomic.AtomicLong;
  * </p>
  */
 public class DistributedHerder extends AbstractHerder implements Runnable {
-    private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class);
+    private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
+    private final Logger log;
 
     private static final long FORWARD_REQUEST_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(10);
     private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
@@ -134,7 +139,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     // Track enough information about the current membership state to be able to determine which requests via the API
     // and the from other nodes are safe to process
     private boolean rebalanceResolved;
-    private ConnectProtocol.Assignment assignment;
+    private ExtendedAssignment runningAssignment = ExtendedAssignment.empty();
+    private ExtendedAssignment assignment;
     private boolean canReadConfigs;
     private ClusterConfigState configState;
 
@@ -149,6 +155,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private Set<String> connectorTargetStateChanges = new HashSet<>();
     private boolean needsReconfigRebalance;
     private volatile int generation;
+    private volatile long scheduledRebalance;
 
     private final DistributedConfig config;
 
@@ -182,12 +189,22 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         this.workerSyncTimeoutMs = config.getInt(DistributedConfig.WORKER_SYNC_TIMEOUT_MS_CONFIG);
         this.workerTasksShutdownTimeoutMs = config.getLong(DistributedConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
         this.workerUnsyncBackoffMs = config.getInt(DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_CONFIG);
-        this.member = member != null ? member : new WorkerGroupMember(config, restUrl, this.configBackingStore, new RebalanceListener(), time);
+
+        String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
+        String clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
+        LogContext logContext = new LogContext("[Worker clientId=" + clientId + ", groupId=" + this.workerGroupId + "] ");
+        log = logContext.logger(DistributedHerder.class);
+
+        this.member = member != null
+                      ? member
+                      : new WorkerGroupMember(config, restUrl, this.configBackingStore,
+                              new RebalanceListener(time), time, clientId, logContext);
+
         this.herderExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingDeque<Runnable>(1),
                 new ThreadFactory() {
                     @Override
                     public Thread newThread(Runnable herder) {
-                        return new Thread(herder, "DistributedHerder");
+                        return new Thread(herder, "DistributedHerder-" + clientId);
                     }
                 });
         this.forwardRequestExecutor = Executors.newSingleThreadExecutor();
@@ -199,6 +216,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         rebalanceResolved = true; // If we still need to follow up after a rebalance occurred, starting up tasks
         needsReconfigRebalance = false;
         canReadConfigs = true; // We didn't try yet, but Configs are readable until proven otherwise
+        scheduledRebalance = Long.MAX_VALUE;
     }
 
     @Override
@@ -274,6 +292,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             }
         }
 
+        if (scheduledRebalance < Long.MAX_VALUE) {
+            nextRequestTimeoutMs = Math.min(nextRequestTimeoutMs, Math.max(scheduledRebalance - now, 0));
+            rebalanceResolved = false;
+            log.debug("Scheduled rebalance at: {} (now: {} nextRequestTimeoutMs: {}) ",
+                    scheduledRebalance, now, nextRequestTimeoutMs);
+        }
+
         // Process any configuration updates
         Set<String> connectorConfigUpdatesCopy = null;
         Set<String> connectorTargetStateChangesCopy = null;
@@ -293,6 +318,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                     connectorConfigUpdates.clear();
                     connectorTargetStateChanges.clear();
                     needsReconfigRebalance = false;
+                    log.debug("Requesting rebalance due to reconfiguration of tasks (needsReconfigRebalance: {})",
+                            needsReconfigRebalance);
                     return;
                 } else {
                     if (!connectorConfigUpdates.isEmpty()) {
@@ -735,13 +762,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     /**
      * Handle post-assignment operations, either trying to resolve issues that kept assignment from completing, getting
-     * this node into sync and its work started. Since
+     * this node into sync and its work started.
      *
      * @return false if we couldn't finish
      */
     private boolean handleRebalanceCompleted() {
-        if (this.rebalanceResolved)
+        if (rebalanceResolved) {
+            log.trace("Returning early because rebalance is marked as resolved (rebalanceResolved: true)");
             return true;
+        }
 
         // We need to handle a variety of cases after a rebalance:
         // 1. Assignment failed
@@ -774,6 +803,15 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             }
         }
 
+        long now = time.milliseconds();
+        if (scheduledRebalance <= now) {
+            log.debug("Requesting rebalance because scheduled rebalance timeout has been reached "
+                    + "(now: {} scheduledRebalance: {}", scheduledRebalance, now);
+
+            needsRejoin = true;
+            scheduledRebalance = Long.MAX_VALUE;
+        }
+
         if (needsReadToEnd) {
             // Force exiting this method to avoid creating any connectors/tasks and require immediate rejoining if
             // we timed out. This should only happen if we failed to read configuration for long enough,
@@ -805,6 +843,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         // guarantees we'll attempt to rejoin before executing this method again.
         herderMetrics.rebalanceSucceeded(time.milliseconds());
         rebalanceResolved = true;
+
+        if (!assignment.revokedConnectors().isEmpty() || !assignment.revokedTasks().isEmpty()) {
+            assignment.revokedConnectors().clear();
+            assignment.revokedTasks().clear();
+            member.requestRejoin();
+            return false;
+        }
         return true;
     }
 
@@ -845,18 +890,33 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private void startWork() {
         // Start assigned connectors and tasks
         log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+
         List<Callable<Void>> callables = new ArrayList<>();
-        for (String connectorName : assignment.connectors()) {
+        for (String connectorName : assignmentDifference(assignment.connectors(), runningAssignment.connectors())) {
             callables.add(getConnectorStartingCallable(connectorName));
         }
 
-        for (ConnectorTaskId taskId : assignment.tasks()) {
+        for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
             callables.add(getTaskStartingCallable(taskId));
         }
         startAndStop(callables);
+        runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0
+                            ? ExtendedAssignment.empty()
+                            : assignment;
+
         log.info("Finished starting connectors and tasks");
     }
 
+    // arguments should assignment collections (connectors or tasks) and should not be null
+    private static <T> Collection<T> assignmentDifference(Collection<T> update, Collection<T> running) {
+        if (running.isEmpty()) {
+            return update;
+        }
+        HashSet<T> diff = new HashSet<>(update);
+        diff.removeAll(running);
+        return diff;
+    }
+
     private boolean startTask(ConnectorTaskId taskId) {
         log.info("Starting task {}", taskId);
         return worker.startTask(
@@ -1203,8 +1263,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
     // Rebalances are triggered internally from the group member, so these are always executed in the work thread.
     public class RebalanceListener implements WorkerRebalanceListener {
+        private final Time time;
+        RebalanceListener(Time time) {
+            this.time = time;
+        }
+
         @Override
-        public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
+        public void onAssigned(ExtendedAssignment assignment, int generation) {
             // This callback just logs the info and saves it. The actual response is handled in the main loop, which
             // ensures the group member's logic for rebalancing can complete, potentially long-running steps to
             // catch up (or backoff if we fail) not executed in a callback, and so we'll be able to invoke other
@@ -1214,6 +1279,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             synchronized (DistributedHerder.this) {
                 DistributedHerder.this.assignment = assignment;
                 DistributedHerder.this.generation = generation;
+                int delay = assignment.delay();
+                DistributedHerder.this.scheduledRebalance = delay > 0
+                                                            ? time.milliseconds() + delay
+                                                            : Long.MAX_VALUE;
                 rebalanceResolved = false;
                 herderMetrics.rebalanceStarted(time.milliseconds());
             }
@@ -1232,16 +1301,10 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
 
         @Override
         public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
-            log.info("Rebalance started");
-
             // Note that since we don't reset the assignment, we don't revoke leadership here. During a rebalance,
             // it is still important to have a leader that can write configs, offsets, etc.
 
             if (rebalanceResolved) {
-                // TODO: Technically we don't have to stop connectors at all until we know they've really been removed from
-                // this worker. Instead, we can let them continue to run but buffer any update requests (which should be
-                // rare anyway). This would avoid a steady stream of start/stop, which probably also includes lots of
-                // unnecessary repeated connections to the source/sink system.
                 List<Callable<Void>> callables = new ArrayList<>();
                 for (final String connectorName : connectors) {
                     callables.add(getConnectorStoppingCallable(connectorName));
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
new file mode 100644
index 0000000..b6dbd09
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/EagerAssignor.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.utils.CircularIterator;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
+import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
+
+
+/**
+ * An assignor that computes a unweighted round-robin distribution of connectors and tasks. The
+ * connectors are assigned to the workers first, followed by the tasks. This is to avoid
+ * load imbalance when several 1-task connectors are running, given that a connector is usually
+ * more lightweight than a task.
+ *
+ * Note that this class is NOT thread-safe.
+ */
+public class EagerAssignor implements ConnectAssignor {
+    private final Logger log;
+
+    public EagerAssignor(LogContext logContext) {
+        this.log = logContext.logger(EagerAssignor.class);
+    }
+
+    @Override
+    public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol,
+                                                     List<JoinGroupResponseMember> allMemberMetadata,
+                                                     WorkerCoordinator coordinator) {
+        log.debug("Performing task assignment");
+        Map<String, ExtendedWorkerState> memberConfigs = new HashMap<>();
+        for (JoinGroupResponseMember member : allMemberMetadata)
+            memberConfigs.put(member.memberId(), IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(member.metadata())));
+
+        long maxOffset = findMaxMemberConfigOffset(memberConfigs, coordinator);
+        Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
+        if (leaderOffset == null)
+            return fillAssignmentsAndSerialize(memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
+                    leaderId, memberConfigs.get(leaderId).url(), maxOffset,
+                    new HashMap<>(), new HashMap<>());
+        return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator);
+    }
+
+    private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
+        // If this leader is behind some other members, we can't do assignment
+        if (coordinator.configSnapshot().offset() < maxOffset) {
+            // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here.
+            // Alternatively, if this node has already passed the maximum reported by any other member of the group, it
+            // is also safe to use this newer state.
+            ClusterConfigState updatedSnapshot = coordinator.configFreshSnapshot();
+            if (updatedSnapshot.offset() < maxOffset) {
+                log.info("Was selected to perform assignments, but do not have latest config found in sync request. " +
+                        "Returning an empty configuration to trigger re-sync.");
+                return null;
+            } else {
+                coordinator.configSnapshot(updatedSnapshot);
+                return updatedSnapshot.offset();
+            }
+        }
+        return maxOffset;
+    }
+
+    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,
+                                                          Map<String, ExtendedWorkerState> memberConfigs,
+                                                          WorkerCoordinator coordinator) {
+        Map<String, Collection<String>> connectorAssignments = new HashMap<>();
+        Map<String, Collection<ConnectorTaskId>> taskAssignments = new HashMap<>();
+
+        // Perform round-robin task assignment. Assign all connectors and then all tasks because assigning both the
+        // connector and its tasks can lead to very uneven distribution of work in some common cases (e.g. for connectors
+        // that generate only 1 task each; in a cluster of 2 or an even # of nodes, only even nodes will be assigned
+        // connectors and only odd nodes will be assigned tasks, but tasks are, on average, actually more resource
+        // intensive than connectors).
+        List<String> connectorsSorted = sorted(coordinator.configSnapshot().connectors());
+        CircularIterator<String> memberIt = new CircularIterator<>(sorted(memberConfigs.keySet()));
+        for (String connectorId : connectorsSorted) {
+            String connectorAssignedTo = memberIt.next();
+            log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);
+            Collection<String> memberConnectors = connectorAssignments.get(connectorAssignedTo);
+            if (memberConnectors == null) {
+                memberConnectors = new ArrayList<>();
+                connectorAssignments.put(connectorAssignedTo, memberConnectors);
+            }
+            memberConnectors.add(connectorId);
+        }
+        for (String connectorId : connectorsSorted) {
+            for (ConnectorTaskId taskId : sorted(coordinator.configSnapshot().tasks(connectorId))) {
+                String taskAssignedTo = memberIt.next();
+                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
+                Collection<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo);
+                if (memberTasks == null) {
+                    memberTasks = new ArrayList<>();
+                    taskAssignments.put(taskAssignedTo, memberTasks);
+                }
+                memberTasks.add(taskId);
+            }
+        }
+
+        coordinator.leaderState(new LeaderState(memberConfigs, connectorAssignments, taskAssignments));
+
+        return fillAssignmentsAndSerialize(memberConfigs.keySet(), Assignment.NO_ERROR,
+                leaderId, memberConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);
+    }
+
+    private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members,
+                                                                short error,
+                                                                String leaderId,
+                                                                String leaderUrl,
+                                                                long maxOffset,
+                                                                Map<String, Collection<String>> connectorAssignments,
+                                                                Map<String, Collection<ConnectorTaskId>> taskAssignments) {
+
+        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
+        for (String member : members) {
+            Collection<String> connectors = connectorAssignments.get(member);
+            if (connectors == null) {
+                connectors = Collections.emptyList();
+            }
+            Collection<ConnectorTaskId> tasks = taskAssignments.get(member);
+            if (tasks == null) {
+                tasks = Collections.emptyList();
+            }
+            Assignment assignment = new Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks);
+            log.debug("Assignment: {} -> {}", member, assignment);
+            groupAssignment.put(member, ConnectProtocol.serializeAssignment(assignment));
+        }
+        log.debug("Finished assignment");
+        return groupAssignment;
+    }
+
+    private long findMaxMemberConfigOffset(Map<String, ExtendedWorkerState> memberConfigs,
+                                           WorkerCoordinator coordinator) {
+        // The new config offset is the maximum seen by any member. We always perform assignment using this offset,
+        // even if some members have fallen behind. The config offset used to generate the assignment is included in
+        // the response so members that have fallen behind will not use the assignment until they have caught up.
+        Long maxOffset = null;
+        for (Map.Entry<String, ExtendedWorkerState> stateEntry : memberConfigs.entrySet()) {
+            long memberRootOffset = stateEntry.getValue().offset();
+            if (maxOffset == null)
+                maxOffset = memberRootOffset;
+            else
+                maxOffset = Math.max(maxOffset, memberRootOffset);
+        }
+
+        log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
+                  maxOffset, coordinator.configSnapshot().offset());
+        return maxOffset;
+    }
+
+    private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) {
+        List<T> res = new ArrayList<>(members);
+        Collections.sort(res);
+        return res;
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
new file mode 100644
index 0000000..7518e06
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.ASSIGNMENT_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONFIG_OFFSET_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECTOR_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECTOR_TASK;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.ERROR_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.LEADER_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.LEADER_URL_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.TASKS_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.ASSIGNMENT_V1;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECTOR_ASSIGNMENT_V1;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.REVOKED_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.SCHEDULED_DELAY_KEY_NAME;
+
+/**
+ * The extended assignment of connectors and tasks that includes revoked connectors and tasks
+ * as well as a scheduled rebalancing delay.
+ */
+public class ExtendedAssignment extends ConnectProtocol.Assignment {
+    private final short version;
+    private final Collection<String> revokedConnectorIds;
+    private final Collection<ConnectorTaskId> revokedTaskIds;
+    private final int delay;
+
+    private static final ExtendedAssignment EMPTY = new ExtendedAssignment(
+            CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, null, null, -1,
+            Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), 0);
+
+    /**
+     * Create an assignment indicating responsibility for the given connector instances and task Ids.
+     *
+     * @param version Connect protocol version
+     * @param error error code for this assignment; {@code ConnectProtocol.Assignment.NO_ERROR}
+     *              indicates no error during assignment
+     * @param leader Connect group's leader Id; may be null only on the empty assignment
+     * @param leaderUrl Connect group's leader URL; may be null only on the empty assignment
+     * @param configOffset the offset in the config topic that this assignment is corresponding to
+     * @param connectorIds list of connectors that the worker should instantiate and run; may not be null
+     * @param taskIds list of task IDs that the worker should instantiate and run; may not be null
+     * @param revokedConnectorIds list of connectors that the worker should stop running; may not be null
+     * @param revokedTaskIds list of task IDs that the worker should stop running; may not be null
+     * @param delay the scheduled delay after which the worker should rejoin the group
+     */
+    public ExtendedAssignment(short version, short error, String leader, String leaderUrl, long configOffset,
+                             Collection<String> connectorIds, Collection<ConnectorTaskId> taskIds,
+                             Collection<String> revokedConnectorIds, Collection<ConnectorTaskId> revokedTaskIds,
+                             int delay) {
+        super(error, leader, leaderUrl, configOffset, connectorIds, taskIds);
+        this.version = version;
+        this.revokedConnectorIds = Objects.requireNonNull(revokedConnectorIds,
+                "Revoked connector IDs may be empty but not null");
+        this.revokedTaskIds = Objects.requireNonNull(revokedTaskIds,
+                "Revoked task IDs may be empty but not null");
+        this.delay = delay;
+    }
+
+    /**
+     * Return the version of the connect protocol that this assignment belongs to.
+     *
+     * @return the connect protocol version of this assignment
+     */
+    public short version() {
+        return version;
+    }
+
+    /**
+     * Return the IDs of the connectors that are revoked by this assignment.
+     *
+     * @return the revoked connector IDs; empty if there are no revoked connectors
+     */
+    public Collection<String> revokedConnectors() {
+        return revokedConnectorIds;
+    }
+
+    /**
+     * Return the IDs of the tasks that are revoked by this assignment.
+     *
+     * @return the revoked task IDs; empty if there are no revoked tasks
+     */
+    public Collection<ConnectorTaskId> revokedTasks() {
+        return revokedTaskIds;
+    }
+
+    /**
+     * Return the delay for the rebalance that is scheduled by this assignment.
+     *
+     * @return the scheduled delay
+     */
+    public int delay() {
+        return delay;
+    }
+
+    /**
+     * Return an empty assignment.
+     *
+     * @return an empty assignment
+     */
+    public static ExtendedAssignment empty() {
+        return EMPTY;
+    }
+
+    @Override
+    public String toString() {
+        return "Assignment{" +
+                "error=" + error() +
+                ", leader='" + leader() + '\'' +
+                ", leaderUrl='" + leaderUrl() + '\'' +
+                ", offset=" + offset() +
+                ", connectorIds=" + connectors() +
+                ", taskIds=" + tasks() +
+                ", revokedConnectorIds=" + revokedConnectorIds +
+                ", revokedTaskIds=" + revokedTaskIds +
+                ", delay=" + delay +
+                '}';
+    }
+
+    private Map<String, Collection<Integer>> revokedAsMap() {
+        if (revokedConnectorIds == null && revokedTaskIds == null) {
+            return null;
+        }
+        // Using LinkedHashMap preserves the ordering, which is helpful for tests and debugging
+        Map<String, Collection<Integer>> taskMap = new LinkedHashMap<>();
+        Optional.ofNullable(revokedConnectorIds)
+                .orElseGet(Collections::emptyList)
+                .stream()
+                .distinct()
+                .forEachOrdered(connectorId -> {
+                    Collection<Integer> connectorTasks =
+                            taskMap.computeIfAbsent(connectorId, v -> new ArrayList<>());
+                    connectorTasks.add(CONNECTOR_TASK);
+                });
+
+        Optional.ofNullable(revokedTaskIds)
+                .orElseGet(Collections::emptyList)
+                .forEach(taskId -> {
+                    String connectorId = taskId.connector();
+                    Collection<Integer> connectorTasks =
+                            taskMap.computeIfAbsent(connectorId, v -> new ArrayList<>());
+                    connectorTasks.add(taskId.task());
+                });
+        return taskMap;
+    }
+
+    /**
+     * Return the {@code Struct} that corresponds to this assignment.
+     *
+     * @return the assignment struct
+     */
+    public Struct toStruct() {
+        Collection<Struct> assigned = taskAssignments(asMap());
+        Collection<Struct> revoked = taskAssignments(revokedAsMap());
+        return new Struct(ASSIGNMENT_V1)
+                .set(ERROR_KEY_NAME, error())
+                .set(LEADER_KEY_NAME, leader())
+                .set(LEADER_URL_KEY_NAME, leaderUrl())
+                .set(CONFIG_OFFSET_KEY_NAME, offset())
+                .set(ASSIGNMENT_KEY_NAME, assigned != null ? assigned.toArray() : null)
+                .set(REVOKED_KEY_NAME, revoked != null ? revoked.toArray() : null)
+                .set(SCHEDULED_DELAY_KEY_NAME, delay);
+    }
+
+    /**
+     * Given a {@code Struct} that encodes an assignment return the assignment object.
+     *
+     * @param struct a struct representing an assignment
+     * @return the assignment
+     */
+    public static ExtendedAssignment fromStruct(short version, Struct struct) {
+        return struct == null
+               ? null
+               : new ExtendedAssignment(
+                       version,
+                       struct.getShort(ERROR_KEY_NAME),
+                       struct.getString(LEADER_KEY_NAME),
+                       struct.getString(LEADER_URL_KEY_NAME),
+                       struct.getLong(CONFIG_OFFSET_KEY_NAME),
+                       extractConnectors(struct, ASSIGNMENT_KEY_NAME),
+                       extractTasks(struct, ASSIGNMENT_KEY_NAME),
+                       extractConnectors(struct, REVOKED_KEY_NAME),
+                       extractTasks(struct, REVOKED_KEY_NAME),
+                       struct.getInt(SCHEDULED_DELAY_KEY_NAME));
+    }
+
+    private static Collection<Struct> taskAssignments(Map<String, Collection<Integer>> assignments) {
+        return assignments == null
+               ? null
+               : assignments.entrySet().stream()
+                       .map(connectorEntry -> {
+                           Struct taskAssignment = new Struct(CONNECTOR_ASSIGNMENT_V1);
+                           taskAssignment.set(CONNECTOR_KEY_NAME, connectorEntry.getKey());
+                           taskAssignment.set(TASKS_KEY_NAME, connectorEntry.getValue().toArray());
+                           return taskAssignment;
+                       }).collect(Collectors.toList());
+    }
+
+    private static Collection<String> extractConnectors(Struct struct, String key) {
+        assert REVOKED_KEY_NAME.equals(key) || ASSIGNMENT_KEY_NAME.equals(key);
+
+        Object[] connectors = struct.getArray(key);
+        if (connectors == null) {
+            return Collections.emptyList();
+        }
+        List<String> connectorIds = new ArrayList<>();
+        for (Object structObj : connectors) {
+            Struct assignment = (Struct) structObj;
+            String connector = assignment.getString(CONNECTOR_KEY_NAME);
+            for (Object taskIdObj : assignment.getArray(TASKS_KEY_NAME)) {
+                Integer taskId = (Integer) taskIdObj;
+                if (taskId == CONNECTOR_TASK) {
+                    connectorIds.add(connector);
+                }
+            }
+        }
+        return connectorIds;
+    }
+
+    private static Collection<ConnectorTaskId> extractTasks(Struct struct, String key) {
+        assert REVOKED_KEY_NAME.equals(key) || ASSIGNMENT_KEY_NAME.equals(key);
+
+        Object[] tasks = struct.getArray(key);
+        if (tasks == null) {
+            return Collections.emptyList();
+        }
+        List<ConnectorTaskId> tasksIds = new ArrayList<>();
+        for (Object structObj : tasks) {
+            Struct assignment = (Struct) structObj;
+            String connector = assignment.getString(CONNECTOR_KEY_NAME);
+            for (Object taskIdObj : assignment.getArray(TASKS_KEY_NAME)) {
+                Integer taskId = (Integer) taskIdObj;
+                if (taskId != CONNECTOR_TASK) {
+                    tasksIds.add(new ConnectorTaskId(connector, taskId));
+                }
+            }
+        }
+        return tasksIds;
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedWorkerState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedWorkerState.java
new file mode 100644
index 0000000..663979b
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedWorkerState.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+/**
+ * A class that captures the deserialized form of a worker's metadata.
+ */
+public class ExtendedWorkerState extends ConnectProtocol.WorkerState {
+    private final ExtendedAssignment assignment;
+
+    public ExtendedWorkerState(String url, long offset, ExtendedAssignment assignment) {
+        super(url, offset);
+        this.assignment = assignment != null ? assignment : ExtendedAssignment.empty();
+    }
+
+    /**
+     * This method returns which was the assignment of connectors and tasks on a worker at the
+     * moment that its state was captured by this class.
+     *
+     * @return the assignment of connectors and tasks
+     */
+    public ExtendedAssignment assignment() {
+        return assignment;
+    }
+
+    @Override
+    public String toString() {
+        return "WorkerState{" +
+                "url='" + url() + '\'' +
+                ", offset=" + offset() +
+                ", " + assignment +
+                '}';
+    }
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
new file mode 100644
index 0000000..ae36837
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
@@ -0,0 +1,644 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
+import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.slf4j.Logger;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.Assignment;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.LeaderState;
+
+/**
+ * An assignor that computes a distribution of connectors and tasks according to the incremental
+ * cooperative strategy for rebalancing. {@see
+ * https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative
+ * +Rebalancing+in+Kafka+Connect} for a description of the assignment policy.
+ *
+ * Note that this class is NOT thread-safe.
+ */
+public class IncrementalCooperativeAssignor implements ConnectAssignor {
+    private final Logger log;
+    private final Time time;
+    private final int maxDelay;
+    private ConnectorsAndTasks previousAssignment;
+    private ConnectorsAndTasks previousRevocation;
+    private boolean canRevoke;
+    // visible for testing
+    protected final Set<String> candidateWorkersForReassignment;
+    protected long scheduledRebalance;
+    protected int delay;
+
+    public IncrementalCooperativeAssignor(LogContext logContext, Time time, int maxDelay) {
+        this.log = logContext.logger(IncrementalCooperativeAssignor.class);
+        this.time = time;
+        this.maxDelay = maxDelay;
+        this.previousAssignment = ConnectorsAndTasks.EMPTY;
+        this.previousRevocation = new ConnectorsAndTasks.Builder().build();
+        this.canRevoke = true;
+        this.scheduledRebalance = 0;
+        this.candidateWorkersForReassignment = new LinkedHashSet<>();
+        this.delay = 0;
+    }
+
+    @Override
+    public Map<String, ByteBuffer> performAssignment(String leaderId, String protocol,
+                                                     List<JoinGroupResponseMember> allMemberMetadata,
+                                                     WorkerCoordinator coordinator) {
+        log.debug("Performing task assignment");
+
+        Map<String, ExtendedWorkerState> memberConfigs = new HashMap<>();
+        for (JoinGroupResponseMember member : allMemberMetadata) {
+            memberConfigs.put(
+                    member.memberId(),
+                    IncrementalCooperativeConnectProtocol.deserializeMetadata(ByteBuffer.wrap(member.metadata())));
+        }
+        log.debug("Member configs: {}", memberConfigs);
+
+        // The new config offset is the maximum seen by any member. We always perform assignment using this offset,
+        // even if some members have fallen behind. The config offset used to generate the assignment is included in
+        // the response so members that have fallen behind will not use the assignment until they have caught up.
+        long maxOffset = memberConfigs.values().stream().map(ExtendedWorkerState::offset).max(Long::compare).get();
+        log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
+                  maxOffset, coordinator.configSnapshot().offset());
+
+        Long leaderOffset = ensureLeaderConfig(maxOffset, coordinator);
+        if (leaderOffset == null) {
+            Map<String, ExtendedAssignment> assignments = fillAssignments(
+                    memberConfigs.keySet(), Assignment.CONFIG_MISMATCH,
+                    leaderId, memberConfigs.get(leaderId).url(), maxOffset, Collections.emptyMap(),
+                    Collections.emptyMap(), Collections.emptyMap(), 0);
+            return serializeAssignments(assignments);
+        }
+        return performTaskAssignment(leaderId, leaderOffset, memberConfigs, coordinator);
+    }
+
+    private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
+        // If this leader is behind some other members, we can't do assignment
+        if (coordinator.configSnapshot().offset() < maxOffset) {
+            // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here.
+            // Alternatively, if this node has already passed the maximum reported by any other member of the group, it
+            // is also safe to use this newer state.
+            ClusterConfigState updatedSnapshot = coordinator.configFreshSnapshot();
+            if (updatedSnapshot.offset() < maxOffset) {
+                log.info("Was selected to perform assignments, but do not have latest config found in sync request. "
+                         + "Returning an empty configuration to trigger re-sync.");
+                return null;
+            } else {
+                coordinator.configSnapshot(updatedSnapshot);
+                return updatedSnapshot.offset();
+            }
+        }
+        return maxOffset;
+    }
+
+    /**
+     * Performs task assignment based on the incremental cooperative connect protocol.
+     * Read more on the design and implementation in:
+     * {@see https://cwiki.apache.org/confluence/display/KAFKA/KIP-415%3A+Incremental+Cooperative+Rebalancing+in+Kafka+Connect}
+     *
+     * @param leaderId the ID of the group leader
+     * @param maxOffset the latest known offset of the configuration topic
+     * @param memberConfigs the metadata of all the members of the group as gather in the current
+     * round of rebalancing
+     * @param coordinator the worker coordinator instance that provide the configuration snapshot
+     * and get assigned the leader state during this assignment
+     * @return the serialized assignment of tasks to the whole group, including assigned or
+     * revoked tasks
+     */
+    protected Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset,
+                                                            Map<String, ExtendedWorkerState> memberConfigs,
+                                                            WorkerCoordinator coordinator) {
+        // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
+        // can be used to calculate derived sets
+        log.debug("Previous assignments: {}", previousAssignment);
+
+        ClusterConfigState snapshot = coordinator.configSnapshot();
+        Set<String> configuredConnectors = new TreeSet<>(snapshot.connectors());
+        Set<ConnectorTaskId> configuredTasks = configuredConnectors.stream()
+                .flatMap(c -> snapshot.tasks(c).stream())
+                .collect(Collectors.toSet());
+
+        // Base set: The set of configured connectors-and-tasks is a standalone snapshot that can
+        // be used to calculate derived sets
+        ConnectorsAndTasks configured = new ConnectorsAndTasks.Builder()
+                .with(configuredConnectors, configuredTasks).build();
+        log.debug("Configured assignments: {}", configured);
+
+        // Base set: The set of active connectors-and-tasks is a standalone snapshot that can be
+        // used to calculate derived sets
+        ConnectorsAndTasks activeAssignments = assignment(memberConfigs);
+        log.debug("Active assignments: {}", activeAssignments);
+
+        // This means that a previous revocation did not take effect. In this case, reset
+        // appropriately and be ready to re-apply revocation of tasks
+        if (!previousRevocation.isEmpty()) {
+            if (previousRevocation.connectors().stream().anyMatch(c -> activeAssignments.connectors().contains(c))
+                    || previousRevocation.tasks().stream().anyMatch(t -> activeAssignments.tasks().contains(t))) {
+                previousAssignment = activeAssignments;
+                canRevoke = true;
+            }
+            previousRevocation.connectors().clear();
+            previousRevocation.tasks().clear();
+        }
+
+        // Derived set: The set of deleted connectors-and-tasks is a derived set from the set
+        // difference of previous - configured
+        ConnectorsAndTasks deleted = diff(previousAssignment, configured);
+        log.debug("Deleted assignments: {}", deleted);
+
+        // Derived set: The set of remaining active connectors-and-tasks is a derived set from the
+        // set difference of active - deleted
+        ConnectorsAndTasks remainingActive = diff(activeAssignments, deleted);
+        log.debug("Remaining (excluding deleted) active assignments: {}", remainingActive);
+
+        // Derived set: The set of lost or unaccounted connectors-and-tasks is a derived set from
+        // the set difference of previous - active - deleted
+        ConnectorsAndTasks lostAssignments = diff(previousAssignment, activeAssignments, deleted);
+        log.debug("Lost assignments: {}", lostAssignments);
+
+        // Derived set: The set of new connectors-and-tasks is a derived set from the set
+        // difference of configured - previous
+        ConnectorsAndTasks newSubmissions = diff(configured, previousAssignment);
+        log.debug("New assignments: {}", newSubmissions);
+
+        // A collection of the complete assignment
+        List<WorkerLoad> completeWorkerAssignment = workerAssignment(memberConfigs, ConnectorsAndTasks.EMPTY);
+        log.debug("Complete (ignoring deletions) worker assignments: {}", completeWorkerAssignment);
+
+        // Per worker connector assignments without removing deleted connectors yet
+        Map<String, Collection<String>> connectorAssignments =
+                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
+        log.debug("Complete (ignoring deletions) connector assignments: {}", connectorAssignments);
+
+        // Per worker task assignments without removing deleted connectors yet
+        Map<String, Collection<ConnectorTaskId>> taskAssignments =
+                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
+        log.debug("Complete (ignoring deletions) task assignments: {}", taskAssignments);
+
+        // A collection of the current assignment excluding the connectors-and-tasks to be deleted
+        List<WorkerLoad> currentWorkerAssignment = workerAssignment(memberConfigs, deleted);
+
+        Map<String, ConnectorsAndTasks> toRevoke = computeDeleted(deleted, connectorAssignments, taskAssignments);
+        log.debug("Connector and task to delete assignments: {}", toRevoke);
+
+        // Recompute the complete assignment excluding the deleted connectors-and-tasks
+        completeWorkerAssignment = workerAssignment(memberConfigs, deleted);
+        connectorAssignments =
+                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
+        taskAssignments =
+                completeWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
+
+        handleLostAssignments(lostAssignments, newSubmissions, completeWorkerAssignment);
+
+        // Do not revoke resources for re-assignment while a delayed rebalance is active
+        // Also we do not revoke in two consecutive rebalances by the same leader
+        canRevoke = delay == 0 && canRevoke;
+
+        // Compute the connectors-and-tasks to be revoked for load balancing without taking into
+        // account the deleted ones.
+        log.debug("Can leader revoke tasks in this assignment? {} (delay: {})", canRevoke, delay);
+        if (canRevoke) {
+            Map<String, ConnectorsAndTasks> toExplicitlyRevoke =
+                    performTaskRevocation(activeAssignments, currentWorkerAssignment);
+
+            log.debug("Connector and task to revoke assignments: {}", toRevoke);
+
+            toExplicitlyRevoke.forEach(
+                (worker, assignment) -> {
+                    ConnectorsAndTasks existing = toRevoke.computeIfAbsent(
+                        worker,
+                        v -> new ConnectorsAndTasks.Builder().build());
+                    existing.connectors().addAll(assignment.connectors());
+                    existing.tasks().addAll(assignment.tasks());
+                }
+            );
+            canRevoke = toExplicitlyRevoke.size() == 0;
+        } else {
+            canRevoke = delay == 0;
+        }
+
+        assignConnectors(completeWorkerAssignment, newSubmissions.connectors());
+        assignTasks(completeWorkerAssignment, newSubmissions.tasks());
+
+        log.debug("Current complete assignments: {}", currentWorkerAssignment);
+        log.debug("New complete assignments: {}", completeWorkerAssignment);
+
+        Map<String, Collection<String>> currentConnectorAssignments =
+                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::connectors));
+
+        Map<String, Collection<ConnectorTaskId>> currentTaskAssignments =
+                currentWorkerAssignment.stream().collect(Collectors.toMap(WorkerLoad::worker, WorkerLoad::tasks));
+
+        Map<String, Collection<String>> incrementalConnectorAssignments =
+                diff(connectorAssignments, currentConnectorAssignments);
+
+        Map<String, Collection<ConnectorTaskId>> incrementalTaskAssignments =
+                diff(taskAssignments, currentTaskAssignments);
+
+        log.debug("Incremental connector assignments: {}", incrementalConnectorAssignments);
+        log.debug("Incremental task assignments: {}", incrementalTaskAssignments);
+
+        coordinator.leaderState(new LeaderState(memberConfigs, connectorAssignments, taskAssignments));
+
+        Map<String, ExtendedAssignment> assignments =
+                fillAssignments(memberConfigs.keySet(), Assignment.NO_ERROR, leaderId,
+                                memberConfigs.get(leaderId).url(), maxOffset, incrementalConnectorAssignments,
+                                incrementalTaskAssignments, toRevoke, delay);
+
+        previousAssignment = computePreviousAssignment(toRevoke, connectorAssignments, taskAssignments, lostAssignments);
+
+        log.debug("Actual assignments: {}", assignments);
+        return serializeAssignments(assignments);
+    }
+
+    private Map<String, ConnectorsAndTasks> computeDeleted(ConnectorsAndTasks deleted,
+                                                           Map<String, Collection<String>> connectorAssignments,
+                                                           Map<String, Collection<ConnectorTaskId>> taskAssignments) {
+        // Connector to worker reverse lookup map
+        Map<String, String> connectorOwners = WorkerCoordinator.invertAssignment(connectorAssignments);
+        // Task to worker reverse lookup map
+        Map<ConnectorTaskId, String> taskOwners = WorkerCoordinator.invertAssignment(taskAssignments);
+
+        Map<String, ConnectorsAndTasks> toRevoke = new HashMap<>();
+        // Add the connectors that have been deleted to the revoked set
+        deleted.connectors().forEach(c ->
+                toRevoke.computeIfAbsent(
+                    connectorOwners.get(c),
+                    v -> new ConnectorsAndTasks.Builder().build()
+                ).connectors().add(c));
+        // Add the tasks that have been deleted to the revoked set
+        deleted.tasks().forEach(t ->
+                toRevoke.computeIfAbsent(
+                    taskOwners.get(t),
+                    v -> new ConnectorsAndTasks.Builder().build()
+                ).tasks().add(t));
+        log.debug("Connectors and tasks to delete assignments: {}", toRevoke);
+        return toRevoke;
+    }
+
+    private ConnectorsAndTasks computePreviousAssignment(Map<String, ConnectorsAndTasks> toRevoke,
+                                                         Map<String, Collection<String>> connectorAssignments,
+                                                         Map<String, Collection<ConnectorTaskId>> taskAssignments,
+                                                         ConnectorsAndTasks lostAssignments) {
+        ConnectorsAndTasks previousAssignment = new ConnectorsAndTasks.Builder().with(
+                connectorAssignments.values().stream().flatMap(Collection::stream).collect(Collectors.toSet()),
+                taskAssignments.values() .stream() .flatMap(Collection::stream).collect(Collectors.toSet()))
+                .build();
+
+        for (ConnectorsAndTasks revoked : toRevoke.values()) {
+            previousAssignment.connectors().removeAll(revoked.connectors());
+            previousAssignment.tasks().removeAll(revoked.tasks());
+            previousRevocation.connectors().addAll(revoked.connectors());
+            previousRevocation.tasks().addAll(revoked.tasks());
+        }
+
+        // Depends on the previous assignment's collections being sets at the moment.
+        // TODO: make it independent
+        previousAssignment.connectors().addAll(lostAssignments.connectors());
+        previousAssignment.tasks().addAll(lostAssignments.tasks());
+
+        return previousAssignment;
+    }
+
+    // visible for testing
+    protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
+                                         ConnectorsAndTasks newSubmissions,
+                                         List<WorkerLoad> completeWorkerAssignment) {
+        if (lostAssignments.isEmpty()) {
+            return;
+        }
+
+        final long now = time.milliseconds();
+        log.debug("Found the following connectors and tasks missing from previous assignments: "
+                + lostAssignments);
+
+        if (scheduledRebalance > 0 && now >= scheduledRebalance) {
+            // delayed rebalance expired and it's time to assign resources
+            Optional<WorkerLoad> candidateWorkerLoad = Optional.empty();
+            if (!candidateWorkersForReassignment.isEmpty()) {
+                candidateWorkerLoad = pickCandidateWorkerForReassignment(completeWorkerAssignment);
+            }
+
+            if (candidateWorkerLoad.isPresent()) {
+                WorkerLoad workerLoad = candidateWorkerLoad.get();
+                lostAssignments.connectors().forEach(workerLoad::assign);
+                lostAssignments.tasks().forEach(workerLoad::assign);
+            } else {
+                newSubmissions.connectors().addAll(lostAssignments.connectors());
+                newSubmissions.tasks().addAll(lostAssignments.tasks());
+            }
+            candidateWorkersForReassignment.clear();
+            scheduledRebalance = 0;
+            delay = 0;
+        } else {
+            candidateWorkersForReassignment
+                    .addAll(candidateWorkersForReassignment(completeWorkerAssignment));
+            if (now < scheduledRebalance) {
+                // a delayed rebalance is in progress, but it's not yet time to reassign
+                // unaccounted resources
+                delay = calculateDelay(now);
+            } else {
+                // This means scheduledRebalance == 0
+                // We could also also extract the current minimum delay from the group, to make
+                // independent of consecutive leader failures, but this optimization is skipped
+                // at the moment
+                delay = maxDelay;
+            }
+            scheduledRebalance = now + delay;
+        }
+    }
+
+    private Set<String> candidateWorkersForReassignment(List<WorkerLoad> completeWorkerAssignment) {
+        return completeWorkerAssignment.stream()
+                .filter(WorkerLoad::isEmpty)
+                .map(WorkerLoad::worker)
+                .collect(Collectors.toSet());
+    }
+
+    private Optional<WorkerLoad> pickCandidateWorkerForReassignment(List<WorkerLoad> completeWorkerAssignment) {
+        Map<String, WorkerLoad> activeWorkers = completeWorkerAssignment.stream()
+                .collect(Collectors.toMap(WorkerLoad::worker, Function.identity()));
+        return candidateWorkersForReassignment.stream()
+                .map(activeWorkers::get)
+                .filter(Objects::nonNull)
+                .findFirst();
+    }
+
+    /**
+     * Task revocation is based on an rough estimation of the lower average number of tasks before
+     * and after new workers join the group. If no new workers join, no revocation takes place.
+     * Based on this estimation, tasks are revoked until the new floor average is reached for
+     * each existing worker. The revoked tasks, once assigned to the new workers will maintain
+     * a balanced load among the group.
+     *
+     * @param activeAssignments
+     * @param completeWorkerAssignment
+     * @return
+     */
+    private Map<String, ConnectorsAndTasks> performTaskRevocation(ConnectorsAndTasks activeAssignments,
+                                                                  Collection<WorkerLoad> completeWorkerAssignment) {
+        int totalActiveConnectorsNum = activeAssignments.connectors().size();
+        int totalActiveTasksNum = activeAssignments.tasks().size();
+        Collection<WorkerLoad> existingWorkers = completeWorkerAssignment.stream()
+                .filter(wl -> wl.size() > 0)
+                .collect(Collectors.toList());
+        int existingWorkersNum = existingWorkers.size();
+        int totalWorkersNum = completeWorkerAssignment.size();
+        int newWorkersNum = totalWorkersNum - existingWorkersNum;
+
+        if (log.isDebugEnabled()) {
+            completeWorkerAssignment.forEach(wl -> log.debug(
+                    "Per worker current load size; worker: {} connectors: {} tasks: {}",
+                    wl.worker(), wl.connectorsSize(), wl.tasksSize()));
+        }
+
+        Map<String, ConnectorsAndTasks> revoking = new HashMap<>();
+        // If there are no new workers, or no existing workers to revoke tasks from return early
+        // after logging the status
+        if (!(newWorkersNum > 0 && existingWorkersNum > 0)) {
+            log.debug("No task revocation required; workers with existing load: {} workers with "
+                    + "no load {} total workers {}",
+                    existingWorkersNum, newWorkersNum, totalWorkersNum);
+            // This is intentionally empty but mutable, because the map is used to include deleted
+            // connectors and tasks as well
+            return revoking;
+        }
+
+        log.debug("Task revocation is required; workers with existing load: {} workers with "
+                + "no load {} total workers {}",
+                existingWorkersNum, newWorkersNum, totalWorkersNum);
+
+        // We have at least one worker assignment (the leader itself) so totalWorkersNum can't be 0
+        log.debug("Previous rounded down (floor) average number of connectors per worker {}", totalActiveConnectorsNum / existingWorkersNum);
+        int floorConnectors = totalActiveConnectorsNum / totalWorkersNum;
+        log.debug("New rounded down (floor) average number of connectors per worker {}", floorConnectors);
+
+        log.debug("Previous rounded down (floor) average number of tasks per worker {}", totalActiveTasksNum / existingWorkersNum);
+        int floorTasks = totalActiveTasksNum / totalWorkersNum;
+        log.debug("New rounded down (floor) average number of tasks per worker {}", floorTasks);
+
+        int numToRevoke = floorConnectors;
+        for (WorkerLoad existing : existingWorkers) {
+            Iterator<String> connectors = existing.connectors().iterator();
+            for (int i = existing.connectorsSize(); i > floorConnectors && numToRevoke > 0; --i, --numToRevoke) {
+                ConnectorsAndTasks resources = revoking.computeIfAbsent(
+                    existing.worker(),
+                    w -> new ConnectorsAndTasks.Builder().build());
+                resources.connectors().add(connectors.next());
+            }
+            if (numToRevoke == 0) {
+                break;
+            }
+        }
+
+        numToRevoke = floorTasks;
+        for (WorkerLoad existing : existingWorkers) {
+            Iterator<ConnectorTaskId> tasks = existing.tasks().iterator();
+            for (int i = existing.tasksSize(); i > floorTasks && numToRevoke > 0; --i, --numToRevoke) {
+                ConnectorsAndTasks resources = revoking.computeIfAbsent(
+                    existing.worker(),
+                    w -> new ConnectorsAndTasks.Builder().build());
+                resources.tasks().add(tasks.next());
+            }
+            if (numToRevoke == 0) {
+                break;
+            }
+        }
+
+        return revoking;
+    }
+
+    private Map<String, ExtendedAssignment> fillAssignments(Collection<String> members, short error,
+                                                            String leaderId, String leaderUrl, long maxOffset,
+                                                            Map<String, Collection<String>> connectorAssignments,
+                                                            Map<String, Collection<ConnectorTaskId>> taskAssignments,
+                                                            Map<String, ConnectorsAndTasks> revoked,
+                                                            int delay) {
+        Map<String, ExtendedAssignment> groupAssignment = new HashMap<>();
+        for (String member : members) {
+            Collection<String> connectorsToStart = connectorAssignments.getOrDefault(member, Collections.emptyList());
+            Collection<ConnectorTaskId> tasksToStart = taskAssignments.getOrDefault(member, Collections.emptyList());
+            Collection<String> connectorsToStop = revoked.getOrDefault(member, ConnectorsAndTasks.EMPTY).connectors();
+            Collection<ConnectorTaskId> tasksToStop = revoked.getOrDefault(member, ConnectorsAndTasks.EMPTY).tasks();
+            ExtendedAssignment assignment =
+                    new ExtendedAssignment(CONNECT_PROTOCOL_V1, error, leaderId, leaderUrl, maxOffset,
+                            connectorsToStart, tasksToStart, connectorsToStop, tasksToStop, delay);
+            log.debug("Filling assignment: {} -> {}", member, assignment);
+            groupAssignment.put(member, assignment);
+        }
+        log.debug("Finished assignment");
+        return groupAssignment;
+    }
+
+    /**
+     * From a map of workers to assignment object generate the equivalent map of workers to byte
+     * buffers of serialized assignments.
+     *
+     * @param assignments the map of worker assignments
+     * @return the serialized map of assignments to workers
+     */
+    protected Map<String, ByteBuffer> serializeAssignments(Map<String, ExtendedAssignment> assignments) {
+        return assignments.entrySet()
+                .stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> IncrementalCooperativeConnectProtocol.serializeAssignment(e.getValue())));
+    }
+
+    private static ConnectorsAndTasks diff(ConnectorsAndTasks base,
+                                           ConnectorsAndTasks... toSubtract) {
+        Collection<String> connectors = new TreeSet<>(base.connectors());
+        Collection<ConnectorTaskId> tasks = new TreeSet<>(base.tasks());
+        for (ConnectorsAndTasks sub : toSubtract) {
+            connectors.removeAll(sub.connectors());
+            tasks.removeAll(sub.tasks());
+        }
+        return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
+    }
+
+    private static <T> Map<String, Collection<T>> diff(Map<String, Collection<T>> base,
+                                                       Map<String, Collection<T>> toSubtract) {
+        Map<String, Collection<T>> incremental = new HashMap<>();
+        for (Map.Entry<String, Collection<T>> entry : base.entrySet()) {
+            List<T> values = new ArrayList<>(entry.getValue());
+            values.removeAll(toSubtract.get(entry.getKey()));
+            incremental.put(entry.getKey(), values);
+        }
+        return incremental;
+    }
+
+    private ConnectorsAndTasks assignment(Map<String, ExtendedWorkerState> memberConfigs) {
+        log.debug("Received assignments: {}", memberConfigs);
+        Set<String> connectors = memberConfigs.values()
+                .stream()
+                .flatMap(state -> state.assignment().connectors().stream())
+                .collect(Collectors.toSet());
+        Set<ConnectorTaskId> tasks = memberConfigs.values()
+                .stream()
+                .flatMap(state -> state.assignment().tasks().stream())
+                .collect(Collectors.toSet());
+        return new ConnectorsAndTasks.Builder().with(connectors, tasks).build();
+    }
+
+    private int calculateDelay(long now) {
+        long diff = scheduledRebalance - now;
+        return diff > 0 ? (int) Math.min(diff, maxDelay) : 0;
+    }
+
+    /**
+     * Perform a round-robin assignment of connectors to workers with existing worker load. This
+     * assignment tries to balance the load between workers, by assigning connectors to workers
+     * that have equal load, starting with the least loaded workers.
+     *
+     * @param workerAssignment the current worker assignment; assigned connectors are added to this list
+     * @param connectors the connectors to be assigned
+     */
+    protected void assignConnectors(List<WorkerLoad> workerAssignment, Collection<String> connectors) {
+        workerAssignment.sort(WorkerLoad.connectorComparator());
+        WorkerLoad first = workerAssignment.get(0);
+
+        Iterator<String> load = connectors.iterator();
+        while (load.hasNext()) {
+            int firstLoad = first.connectorsSize();
+            int upTo = IntStream.range(0, workerAssignment.size())
+                    .filter(i -> workerAssignment.get(i).connectorsSize() > firstLoad)
+                    .findFirst()
+                    .orElse(workerAssignment.size());
+            for (WorkerLoad worker : workerAssignment.subList(0, upTo)) {
+                String connector = load.next();
+                log.debug("Assigning connector {} to {}", connector, worker.worker());
+                worker.assign(connector);
+                if (!load.hasNext()) {
+                    break;
+                }
+            }
+        }
+    }
+
+    /**
+     * Perform a round-robin assignment of tasks to workers with existing worker load. This
+     * assignment tries to balance the load between workers, by assigning tasks to workers that
+     * have equal load, starting with the least loaded workers.
+     *
+     * @param workerAssignment the current worker assignment; assigned tasks are added to this list
+     * @param tasks the tasks to be assigned
+     */
+    protected void assignTasks(List<WorkerLoad> workerAssignment, Collection<ConnectorTaskId> tasks) {
+        workerAssignment.sort(WorkerLoad.taskComparator());
+        WorkerLoad first = workerAssignment.get(0);
+
+        Iterator<ConnectorTaskId> load = tasks.iterator();
+        while (load.hasNext()) {
+            int firstLoad = first.tasksSize();
+            int upTo = IntStream.range(0, workerAssignment.size())
+                    .filter(i -> workerAssignment.get(i).tasksSize() > firstLoad)
+                    .findFirst()
+                    .orElse(workerAssignment.size());
+            for (WorkerLoad worker : workerAssignment.subList(0, upTo)) {
+                ConnectorTaskId task = load.next();
+                log.debug("Assigning task {} to {}", task, worker.worker());
+                worker.assign(task);
+                if (!load.hasNext()) {
+                    break;
+                }
+            }
+        }
+    }
+
+    private static List<WorkerLoad> workerAssignment(Map<String, ExtendedWorkerState> memberConfigs,
+                                                     ConnectorsAndTasks toExclude) {
+        ConnectorsAndTasks ignore = new ConnectorsAndTasks.Builder()
+                .with(new HashSet<>(toExclude.connectors()), new HashSet<>(toExclude.tasks()))
+                .build();
+
+        return memberConfigs.entrySet().stream()
+                .map(e -> new WorkerLoad.Builder(e.getKey()).with(
+                        e.getValue().assignment().connectors().stream()
+                                .filter(v -> !ignore.connectors().contains(v))
+                                .collect(Collectors.toList()),
+                        e.getValue().assignment().tasks().stream()
+                                .filter(v -> !ignore.tasks().contains(v))
+                                .collect(Collectors.toList())
+                        ).build()
+                ).collect(Collectors.toList());
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
new file mode 100644
index 0000000..4e3c736
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeConnectProtocol.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.SchemaException;
+import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.protocol.types.Type;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+
+import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
+import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.ASSIGNMENT_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONFIG_OFFSET_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONFIG_STATE_V0;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECTOR_ASSIGNMENT_V0;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_HEADER_SCHEMA;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.ERROR_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.LEADER_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.LEADER_URL_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.URL_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.VERSION_KEY_NAME;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
+
+
+/**
+ * This class implements a group protocol for Kafka Connect workers that support incremental and
+ * cooperative rebalancing of connectors and tasks. It includes the format of worker state used when
+ * joining the group and distributing assignments, and the format of assignments of connectors
+ * and tasks to workers.
+ */
+public class IncrementalCooperativeConnectProtocol {
+    public static final String ALLOCATION_KEY_NAME = "allocation";
+    public static final String REVOKED_KEY_NAME = "revoked";
+    public static final String SCHEDULED_DELAY_KEY_NAME = "delay";
+    public static final short CONNECT_PROTOCOL_V1 = 1;
+    public static final boolean TOLERATE_MISSING_FIELDS_WITH_DEFAULTS = true;
+
+    /**
+     * Connect Protocol Header V1:
+     * <pre>
+     *   Version            => Int16
+     * </pre>
+     */
+    private static final Struct CONNECT_PROTOCOL_HEADER_V1 = new Struct(CONNECT_PROTOCOL_HEADER_SCHEMA)
+            .set(VERSION_KEY_NAME, CONNECT_PROTOCOL_V1);
+
+    /**
+     * Config State V1:
+     * <pre>
+     *   Url                => [String]
+     *   ConfigOffset       => Int64
+     * </pre>
+     */
+    public static final Schema CONFIG_STATE_V1 = CONFIG_STATE_V0;
+
+    /**
+     * Allocation V1
+     * <pre>
+     *   Current Assignment => [Byte]
+     * </pre>
+     */
+    public static final Schema ALLOCATION_V1 = new Schema(
+            TOLERATE_MISSING_FIELDS_WITH_DEFAULTS,
+            new Field(ALLOCATION_KEY_NAME, NULLABLE_BYTES, null, true, null));
+
+    /**
+     *
+     * Connector Assignment V1:
+     * <pre>
+     *   Connector          => [String]
+     *   Tasks              => [Int32]
+     * </pre>
+     *
+     * <p>Assignments for each worker are a set of connectors and tasks. These are categorized by
+     * connector ID. A sentinel task ID (CONNECTOR_TASK) is used to indicate the connector itself
+     * (i.e. that the assignment includes responsibility for running the Connector instance in
+     * addition to any tasks it generates).</p>
+     */
+    public static final Schema CONNECTOR_ASSIGNMENT_V1 = CONNECTOR_ASSIGNMENT_V0;
+
+    /**
+     * Raw (non versioned) assignment V1:
+     * <pre>
+     *   Error              => Int16
+     *   Leader             => [String]
+     *   LeaderUrl          => [String]
+     *   ConfigOffset       => Int64
+     *   Assignment         => [Connector Assignment]
+     *   Revoked            => [Connector Assignment]
+     *   ScheduledDelay     => Int32
+     * </pre>
+     */
+    public static final Schema ASSIGNMENT_V1 = new Schema(
+            TOLERATE_MISSING_FIELDS_WITH_DEFAULTS,
+            new Field(ERROR_KEY_NAME, Type.INT16),
+            new Field(LEADER_KEY_NAME, Type.STRING),
+            new Field(LEADER_URL_KEY_NAME, Type.STRING),
+            new Field(CONFIG_OFFSET_KEY_NAME, Type.INT64),
+            new Field(ASSIGNMENT_KEY_NAME, ArrayOf.nullable(CONNECTOR_ASSIGNMENT_V1), null, true, null),
+            new Field(REVOKED_KEY_NAME, ArrayOf.nullable(CONNECTOR_ASSIGNMENT_V1), null, true, null),
+            new Field(SCHEDULED_DELAY_KEY_NAME, Type.INT32, null, 0));
+
+    /**
+     * The fields are serialized in sequence as follows:
+     * Subscription V1:
+     * <pre>
+     *   Version            => Int16
+     *   Url                => [String]
+     *   ConfigOffset       => Int64
+     *   Current Assignment => [Byte]
+     * </pre>
+     */
+    public static ByteBuffer serializeMetadata(ExtendedWorkerState workerState) {
+        Struct configState = new Struct(CONFIG_STATE_V1)
+                .set(URL_KEY_NAME, workerState.url())
+                .set(CONFIG_OFFSET_KEY_NAME, workerState.offset());
+        // Not a big issue if we embed the protocol version with the assignment in the metadata
+        Struct allocation = new Struct(ALLOCATION_V1)
+                .set(ALLOCATION_KEY_NAME, serializeAssignment(workerState.assignment()));
+        ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf()
+                                                + CONFIG_STATE_V1.sizeOf(configState)
+                                                + ALLOCATION_V1.sizeOf(allocation));
+        CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
+        CONFIG_STATE_V1.write(buffer, configState);
+        ALLOCATION_V1.write(buffer, allocation);
+        buffer.flip();
+        return buffer;
+    }
+
+    /**
+     * Returns the collection of Connect protocols that are supported by this version along
+     * with their serialized metadata. The protocols are ordered by preference.
+     *
+     * @param workerState the current state of the worker metadata
+     * @return the collection of Connect protocol metadata
+     */
+    public static JoinGroupRequestProtocolCollection metadataRequest(ExtendedWorkerState workerState) {
+        // Order matters in terms of protocol preference
+        return new JoinGroupRequestProtocolCollection(Arrays.asList(
+                new JoinGroupRequestProtocol()
+                        .setName(COMPATIBLE.protocol())
+                        .setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(workerState).array()),
+                new JoinGroupRequestProtocol()
+                        .setName(EAGER.protocol())
+                        .setMetadata(ConnectProtocol.serializeMetadata(workerState).array()))
+                .iterator());
+    }
+
+    /**
+     * Given a byte buffer that contains protocol metadata return the deserialized form of the
+     * metadata.
+     *
+     * @param buffer A buffer containing the protocols metadata
+     * @return the deserialized metadata
+     * @throws SchemaException on incompatible Connect protocol version
+     */
+    public static ExtendedWorkerState deserializeMetadata(ByteBuffer buffer) {
+        Struct header = CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct configState = CONFIG_STATE_V1.read(buffer);
+        long configOffset = configState.getLong(CONFIG_OFFSET_KEY_NAME);
+        String url = configState.getString(URL_KEY_NAME);
+        Struct allocation = ALLOCATION_V1.read(buffer);
+        // Protocol version is embedded with the assignment in the metadata
+        ExtendedAssignment assignment = deserializeAssignment(allocation.getBytes(ALLOCATION_KEY_NAME));
+        return new ExtendedWorkerState(url, configOffset, assignment);
+    }
+
+    /**
+     * The fields are serialized in sequence as follows:
+     * Complete Assignment V1:
+     * <pre>
+     *   Version            => Int16
+     *   Error              => Int16
+     *   Leader             => [String]
+     *   LeaderUrl          => [String]
+     *   ConfigOffset       => Int64
+     *   Assignment         => [Connector Assignment]
+     *   Revoked            => [Connector Assignment]
+     *   ScheduledDelay     => Int32
+     * </pre>
+     */
+    public static ByteBuffer serializeAssignment(ExtendedAssignment assignment) {
+        // comparison depends on reference equality for now
+        if (assignment == null || ExtendedAssignment.empty().equals(assignment)) {
+            return null;
+        }
+        Struct struct = assignment.toStruct();
+        ByteBuffer buffer = ByteBuffer.allocate(CONNECT_PROTOCOL_HEADER_V1.sizeOf()
+                                                + ASSIGNMENT_V1.sizeOf(struct));
+        CONNECT_PROTOCOL_HEADER_V1.writeTo(buffer);
+        ASSIGNMENT_V1.write(buffer, struct);
+        buffer.flip();
+        return buffer;
+    }
+
+    /**
+     * Given a byte buffer that contains an assignment as defined by this protocol, return the
+     * deserialized form of the assignment.
+     *
+     * @param buffer the buffer containing a serialized assignment
+     * @return the deserialized assignment
+     * @throws SchemaException on incompatible Connect protocol version
+     */
+    public static ExtendedAssignment deserializeAssignment(ByteBuffer buffer) {
+        if (buffer == null) {
+            return null;
+        }
+        Struct header = CONNECT_PROTOCOL_HEADER_SCHEMA.read(buffer);
+        Short version = header.getShort(VERSION_KEY_NAME);
+        checkVersionCompatibility(version);
+        Struct struct = ASSIGNMENT_V1.read(buffer);
+        return ExtendedAssignment.fromStruct(version, struct);
+    }
+
+    private static void checkVersionCompatibility(short version) {
+        // check for invalid versions
+        if (version < CONNECT_PROTOCOL_V0)
+            throw new SchemaException("Unsupported subscription version: " + version);
+
+        // otherwise, assume versions can be parsed
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 34b2376..706742a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -18,13 +18,10 @@ package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.consumer.internals.AbstractCoordinator;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
-import org.apache.kafka.common.message.JoinGroupRequestData;
-import org.apache.kafka.common.message.JoinGroupResponseData;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.requests.JoinGroupRequest;
-import org.apache.kafka.common.utils.CircularIterator;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
@@ -37,28 +34,38 @@ import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
+import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
+
 /**
  * This class manages the coordination process with the Kafka group coordinator on the broker for managing assignments
  * to workers.
  */
-public final class WorkerCoordinator extends AbstractCoordinator implements Closeable {
+public class WorkerCoordinator extends AbstractCoordinator implements Closeable {
     // Currently doesn't support multiple task assignment strategies, so we just fill in a default value
     public static final String DEFAULT_SUBPROTOCOL = "default";
 
     private final Logger log;
     private final String restUrl;
     private final ConfigBackingStore configStorage;
-    private ConnectProtocol.Assignment assignmentSnapshot;
+    private ExtendedAssignment assignmentSnapshot;
     private ClusterConfigState configSnapshot;
     private final WorkerRebalanceListener listener;
+    private final ConnectProtocolCompatibility protocolCompatibility;
     private LeaderState leaderState;
 
     private boolean rejoinRequested;
+    private volatile ConnectProtocolCompatibility currentConnectProtocol;
+    private final ConnectAssignor eagerAssignor;
+    private final ConnectAssignor incrementalAssignor;
 
     /**
      * Initialize the coordination manager.
@@ -75,7 +82,9 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
                              long retryBackoffMs,
                              String restUrl,
                              ConfigBackingStore configStorage,
-                             WorkerRebalanceListener listener) {
+                             WorkerRebalanceListener listener,
+                             ConnectProtocolCompatibility protocolCompatibility,
+                             int maxDelay) {
         super(logContext,
               client,
               groupId,
@@ -94,6 +103,10 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         new WorkerCoordinatorMetrics(metrics, metricGrpPrefix);
         this.listener = listener;
         this.rejoinRequested = false;
+        this.protocolCompatibility = protocolCompatibility;
+        this.incrementalAssignor = new IncrementalCooperativeAssignor(logContext, time, maxDelay);
+        this.eagerAssignor = new EagerAssignor(logContext);
+        this.currentConnectProtocol = protocolCompatibility;
     }
 
     @Override
@@ -146,154 +159,66 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     }
 
     @Override
-    public JoinGroupRequestData.JoinGroupRequestProtocolCollection metadata() {
+    public JoinGroupRequestProtocolCollection metadata() {
         configSnapshot = configStorage.snapshot();
-        ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(restUrl, configSnapshot.offset());
-        ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
-        return new JoinGroupRequestData.JoinGroupRequestProtocolCollection(
-                Collections.singleton(new JoinGroupRequestData.JoinGroupRequestProtocol()
-                        .setName(DEFAULT_SUBPROTOCOL)
-                        .setMetadata(metadata.array()))
-                        .iterator()
-        );
+        ExtendedWorkerState workerState = new ExtendedWorkerState(restUrl, configSnapshot.offset(), assignmentSnapshot);
+        switch (protocolCompatibility) {
+            case EAGER:
+                return ConnectProtocol.metadataRequest(workerState);
+            case COMPATIBLE:
+                return IncrementalCooperativeConnectProtocol.metadataRequest(workerState);
+            default:
+                throw new IllegalStateException("Unknown Connect protocol compatibility mode " + protocolCompatibility);
+        }
     }
 
     @Override
     protected void onJoinComplete(int generation, String memberId, String protocol, ByteBuffer memberAssignment) {
-        assignmentSnapshot = ConnectProtocol.deserializeAssignment(memberAssignment);
+        ExtendedAssignment newAssignment = IncrementalCooperativeConnectProtocol.deserializeAssignment(memberAssignment);
+        log.debug("Deserialized new assignment: {}", newAssignment);
+        currentConnectProtocol = ConnectProtocolCompatibility.fromProtocol(protocol);
         // At this point we always consider ourselves to be a member of the cluster, even if there was an assignment
         // error (the leader couldn't make the assignment) or we are behind the config and cannot yet work on our assigned
         // tasks. It's the responsibility of the code driving this process to decide how to react (e.g. trying to get
         // up to date, try to rejoin again, leaving the group and backing off, etc.).
         rejoinRequested = false;
-        listener.onAssigned(assignmentSnapshot, generation);
-    }
-
-    @Override
-    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseData.JoinGroupResponseMember> allMemberMetadata) {
-        log.debug("Performing task assignment");
-
-        Map<String, ConnectProtocol.WorkerState> memberConfigs = new HashMap<>();
-        for (JoinGroupResponseData.JoinGroupResponseMember memberMetadata : allMemberMetadata)
-            memberConfigs.put(memberMetadata.memberId(), ConnectProtocol.deserializeMetadata(ByteBuffer.wrap(memberMetadata.metadata())));
-
-        long maxOffset = findMaxMemberConfigOffset(memberConfigs);
-        Long leaderOffset = ensureLeaderConfig(maxOffset);
-        if (leaderOffset == null)
-            return fillAssignmentsAndSerialize(memberConfigs.keySet(), ConnectProtocol.Assignment.CONFIG_MISMATCH,
-                    leaderId, memberConfigs.get(leaderId).url(), maxOffset,
-                    new HashMap<String, List<String>>(), new HashMap<String, List<ConnectorTaskId>>());
-        return performTaskAssignment(leaderId, leaderOffset, memberConfigs);
-    }
-
-    private long findMaxMemberConfigOffset(Map<String, ConnectProtocol.WorkerState> memberConfigs) {
-        // The new config offset is the maximum seen by any member. We always perform assignment using this offset,
-        // even if some members have fallen behind. The config offset used to generate the assignment is included in
-        // the response so members that have fallen behind will not use the assignment until they have caught up.
-        Long maxOffset = null;
-        for (Map.Entry<String, ConnectProtocol.WorkerState> stateEntry : memberConfigs.entrySet()) {
-            long memberRootOffset = stateEntry.getValue().offset();
-            if (maxOffset == null)
-                maxOffset = memberRootOffset;
-            else
-                maxOffset = Math.max(maxOffset, memberRootOffset);
-        }
-
-        log.debug("Max config offset root: {}, local snapshot config offsets root: {}",
-                maxOffset, configSnapshot.offset());
-        return maxOffset;
-    }
-
-    private Long ensureLeaderConfig(long maxOffset) {
-        // If this leader is behind some other members, we can't do assignment
-        if (configSnapshot.offset() < maxOffset) {
-            // We might be able to take a new snapshot to catch up immediately and avoid another round of syncing here.
-            // Alternatively, if this node has already passed the maximum reported by any other member of the group, it
-            // is also safe to use this newer state.
-            ClusterConfigState updatedSnapshot = configStorage.snapshot();
-            if (updatedSnapshot.offset() < maxOffset) {
-                log.info("Was selected to perform assignments, but do not have latest config found in sync request. " +
-                        "Returning an empty configuration to trigger re-sync.");
-                return null;
-            } else {
-                configSnapshot = updatedSnapshot;
-                return configSnapshot.offset();
+        if (currentConnectProtocol != EAGER) {
+            if (!newAssignment.revokedConnectors().isEmpty() || !newAssignment.revokedTasks().isEmpty()) {
+                listener.onRevoked(newAssignment.leader(), newAssignment.revokedConnectors(), newAssignment.revokedTasks());
             }
-        }
-
-        return maxOffset;
-    }
 
-    private Map<String, ByteBuffer> performTaskAssignment(String leaderId, long maxOffset, Map<String, ConnectProtocol.WorkerState> memberConfigs) {
-        Map<String, List<String>> connectorAssignments = new HashMap<>();
-        Map<String, List<ConnectorTaskId>> taskAssignments = new HashMap<>();
-
-        // Perform round-robin task assignment. Assign all connectors and then all tasks because assigning both the
-        // connector and its tasks can lead to very uneven distribution of work in some common cases (e.g. for connectors
-        // that generate only 1 task each; in a cluster of 2 or an even # of nodes, only even nodes will be assigned
-        // connectors and only odd nodes will be assigned tasks, but tasks are, on average, actually more resource
-        // intensive than connectors).
-        List<String> connectorsSorted = sorted(configSnapshot.connectors());
-        CircularIterator<String> memberIt = new CircularIterator<>(sorted(memberConfigs.keySet()));
-        for (String connectorId : connectorsSorted) {
-            String connectorAssignedTo = memberIt.next();
-            log.trace("Assigning connector {} to {}", connectorId, connectorAssignedTo);
-            List<String> memberConnectors = connectorAssignments.get(connectorAssignedTo);
-            if (memberConnectors == null) {
-                memberConnectors = new ArrayList<>();
-                connectorAssignments.put(connectorAssignedTo, memberConnectors);
+            if (assignmentSnapshot != null) {
+                assignmentSnapshot.connectors().removeAll(newAssignment.revokedConnectors());
+                assignmentSnapshot.tasks().removeAll(newAssignment.revokedTasks());
+                log.debug("After revocations snapshot of assignment: {}", assignmentSnapshot);
+                newAssignment.connectors().addAll(assignmentSnapshot.connectors());
+                newAssignment.tasks().addAll(assignmentSnapshot.tasks());
             }
-            memberConnectors.add(connectorId);
+            log.debug("Augmented new assignment: {}", newAssignment);
         }
-        for (String connectorId : connectorsSorted) {
-            for (ConnectorTaskId taskId : sorted(configSnapshot.tasks(connectorId))) {
-                String taskAssignedTo = memberIt.next();
-                log.trace("Assigning task {} to {}", taskId, taskAssignedTo);
-                List<ConnectorTaskId> memberTasks = taskAssignments.get(taskAssignedTo);
-                if (memberTasks == null) {
-                    memberTasks = new ArrayList<>();
-                    taskAssignments.put(taskAssignedTo, memberTasks);
-                }
-                memberTasks.add(taskId);
-            }
-        }
-
-        this.leaderState = new LeaderState(memberConfigs, connectorAssignments, taskAssignments);
-
-        return fillAssignmentsAndSerialize(memberConfigs.keySet(), ConnectProtocol.Assignment.NO_ERROR,
-                leaderId, memberConfigs.get(leaderId).url(), maxOffset, connectorAssignments, taskAssignments);
+        assignmentSnapshot = newAssignment;
+        listener.onAssigned(assignmentSnapshot, generation);
     }
 
-    private Map<String, ByteBuffer> fillAssignmentsAndSerialize(Collection<String> members,
-                                                                short error,
-                                                                String leaderId,
-                                                                String leaderUrl,
-                                                                long maxOffset,
-                                                                Map<String, List<String>> connectorAssignments,
-                                                                Map<String, List<ConnectorTaskId>> taskAssignments) {
-
-        Map<String, ByteBuffer> groupAssignment = new HashMap<>();
-        for (String member : members) {
-            List<String> connectors = connectorAssignments.get(member);
-            if (connectors == null)
-                connectors = Collections.emptyList();
-            List<ConnectorTaskId> tasks = taskAssignments.get(member);
-            if (tasks == null)
-                tasks = Collections.emptyList();
-            ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(error, leaderId, leaderUrl, maxOffset, connectors, tasks);
-            log.debug("Assignment: {} -> {}", member, assignment);
-            groupAssignment.put(member, ConnectProtocol.serializeAssignment(assignment));
-        }
-        log.debug("Finished assignment");
-        return groupAssignment;
+    @Override
+    protected Map<String, ByteBuffer> performAssignment(String leaderId, String protocol, List<JoinGroupResponseMember> allMemberMetadata) {
+        return ConnectProtocolCompatibility.fromProtocol(protocol) == EAGER
+               ? eagerAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this)
+               : incrementalAssignor.performAssignment(leaderId, protocol, allMemberMetadata, this);
     }
 
     @Override
     protected void onJoinPrepare(int generation, String memberId) {
-        this.leaderState = null;
-        log.debug("Revoking previous assignment {}", assignmentSnapshot);
-        if (assignmentSnapshot != null && !assignmentSnapshot.failed())
-            listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
+        log.info("Rebalance started");
+        leaderState(null);
+        if (currentConnectProtocol == EAGER) {
+            log.debug("Revoking previous assignment {}", assignmentSnapshot);
+            if (assignmentSnapshot != null && !assignmentSnapshot.failed())
+                listener.onRevoked(assignmentSnapshot.leader(), assignmentSnapshot.connectors(), assignmentSnapshot.tasks());
+        } else {
+            log.debug("Cooperative rebalance triggered. Keeping assignment {} until it's "
+                      + "explicitly revoked.", assignmentSnapshot);
+        }
     }
 
     @Override
@@ -315,13 +240,67 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
     public String ownerUrl(String connector) {
         if (rejoinNeededOrPending() || !isLeader())
             return null;
-        return leaderState.ownerUrl(connector);
+        return leaderState().ownerUrl(connector);
     }
 
     public String ownerUrl(ConnectorTaskId task) {
         if (rejoinNeededOrPending() || !isLeader())
             return null;
-        return leaderState.ownerUrl(task);
+        return leaderState().ownerUrl(task);
+    }
+
+    /**
+     * Get an up-to-date snapshot of the cluster configuration.
+     *
+     * @return the state of the cluster configuration; the result is not locally cached
+     */
+    public ClusterConfigState configFreshSnapshot() {
+        return configStorage.snapshot();
+    }
+
+    /**
+     * Get a snapshot of the cluster configuration.
+     *
+     * @return the state of the cluster configuration
+     */
+    public ClusterConfigState configSnapshot() {
+        return configSnapshot;
+    }
+
+    /**
+     * Set the state of the cluster configuration to this worker coordinator.
+     *
+     * @param update the updated state of the cluster configuration
+     */
+    public void configSnapshot(ClusterConfigState update) {
+        configSnapshot = update;
+    }
+
+    /**
+     * Get the leader state stored in this worker coordinator.
+     *
+     * @return the leader state
+     */
+    private LeaderState leaderState() {
+        return leaderState;
+    }
+
+    /**
+     * Store the leader state to this worker coordinator.
+     *
+     * @param update the updated leader state
+     */
+    public void leaderState(LeaderState update) {
+        leaderState = update;
+    }
+
+    /**
+     * Get the version of the connect protocol that is currently active in the group of workers.
+     *
+     * @return the current connect protocol version
+     */
+    public short currentProtocolVersion() {
+        return currentConnectProtocol == EAGER ? (short) 0 : (short) 1;
     }
 
     private class WorkerCoordinatorMetrics {
@@ -353,15 +332,9 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         }
     }
 
-    private static <T extends Comparable<T>> List<T> sorted(Collection<T> members) {
-        List<T> res = new ArrayList<>(members);
-        Collections.sort(res);
-        return res;
-    }
-
-    private static <K, V> Map<V, K> invertAssignment(Map<K, List<V>> assignment) {
+    public static <K, V> Map<V, K> invertAssignment(Map<K, Collection<V>> assignment) {
         Map<V, K> inverted = new HashMap<>();
-        for (Map.Entry<K, List<V>> assignmentEntry : assignment.entrySet()) {
+        for (Map.Entry<K, Collection<V>> assignmentEntry : assignment.entrySet()) {
             K key = assignmentEntry.getKey();
             for (V value : assignmentEntry.getValue())
                 inverted.put(value, key);
@@ -369,14 +342,14 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
         return inverted;
     }
 
-    private static class LeaderState {
-        private final Map<String, ConnectProtocol.WorkerState> allMembers;
+    public static class LeaderState {
+        private final Map<String, ExtendedWorkerState> allMembers;
         private final Map<String, String> connectorOwners;
         private final Map<ConnectorTaskId, String> taskOwners;
 
-        public LeaderState(Map<String, ConnectProtocol.WorkerState> allMembers,
-                           Map<String, List<String>> connectorAssignment,
-                           Map<String, List<ConnectorTaskId>> taskAssignment) {
+        public LeaderState(Map<String, ExtendedWorkerState> allMembers,
+                           Map<String, Collection<String>> connectorAssignment,
+                           Map<String, Collection<ConnectorTaskId>> taskAssignment) {
             this.allMembers = allMembers;
             this.connectorOwners = invertAssignment(connectorAssignment);
             this.taskOwners = invertAssignment(taskAssignment);
@@ -398,4 +371,194 @@ public final class WorkerCoordinator extends AbstractCoordinator implements Clos
 
     }
 
+    public static class ConnectorsAndTasks {
+        public static final ConnectorsAndTasks EMPTY =
+                new ConnectorsAndTasks(Collections.emptyList(), Collections.emptyList());
+
+        private final Collection<String> connectors;
+        private final Collection<ConnectorTaskId> tasks;
+
+        private ConnectorsAndTasks(Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+            this.connectors = connectors;
+            this.tasks = tasks;
+        }
+
+        public static class Builder {
+            private Collection<String> withConnectors;
+            private Collection<ConnectorTaskId> withTasks;
+
+            public Builder() {
+            }
+
+            public ConnectorsAndTasks.Builder withCopies(Collection<String> connectors,
+                                                         Collection<ConnectorTaskId> tasks) {
+                withConnectors = new ArrayList<>(connectors);
+                withTasks = new ArrayList<>(tasks);
+                return this;
+            }
+
+            public ConnectorsAndTasks.Builder with(Collection<String> connectors,
+                                                   Collection<ConnectorTaskId> tasks) {
+                withConnectors = new ArrayList<>(connectors);
+                withTasks = new ArrayList<>(tasks);
+                return this;
+            }
+
+            public ConnectorsAndTasks build() {
+                return new ConnectorsAndTasks(
+                        withConnectors != null ? withConnectors : new ArrayList<>(),
+                        withTasks != null ? withTasks : new ArrayList<>());
+            }
+        }
+
+        public Collection<String> connectors() {
+            return connectors;
+        }
+
+        public Collection<ConnectorTaskId> tasks() {
+            return tasks;
+        }
+
+        public int size() {
+            return connectors.size() + tasks.size();
+        }
+
+        public boolean isEmpty() {
+            return connectors.isEmpty() && tasks.isEmpty();
+        }
+
+        @Override
+        public String toString() {
+            return "{ connectorIds=" + connectors + ", taskIds=" + tasks + '}';
+        }
+    }
+
+    public static class WorkerLoad {
+        private final String worker;
+        private final Collection<String> connectors;
+        private final Collection<ConnectorTaskId> tasks;
+
+        private WorkerLoad(
+                String worker,
+                Collection<String> connectors,
+                Collection<ConnectorTaskId> tasks
+        ) {
+            this.worker = worker;
+            this.connectors = connectors;
+            this.tasks = tasks;
+        }
+
+        public static class Builder {
+            private String withWorker;
+            private Collection<String> withConnectors;
+            private Collection<ConnectorTaskId> withTasks;
+
+            public Builder(String worker) {
+                this.withWorker = Objects.requireNonNull(worker, "worker cannot be null");
+            }
+
+            public WorkerLoad.Builder withCopies(Collection<String> connectors,
+                                                 Collection<ConnectorTaskId> tasks) {
+                withConnectors = new ArrayList<>(
+                        Objects.requireNonNull(connectors, "connectors may be empty but not null"));
+                withTasks = new ArrayList<>(
+                        Objects.requireNonNull(tasks, "tasks may be empty but not null"));
+                return this;
+            }
+
+            public WorkerLoad.Builder with(Collection<String> connectors,
+                                           Collection<ConnectorTaskId> tasks) {
+                withConnectors = Objects.requireNonNull(connectors,
+                        "connectors may be empty but not null");
+                withTasks = Objects.requireNonNull(tasks, "tasks may be empty but not null");
+                return this;
+            }
+
+            public WorkerLoad build() {
+                return new WorkerLoad(
+                        withWorker,
+                        withConnectors != null ? withConnectors : new ArrayList<>(),
+                        withTasks != null ? withTasks : new ArrayList<>());
+            }
+        }
+
+        public String worker() {
+            return worker;
+        }
+
+        public Collection<String> connectors() {
+            return connectors;
+        }
+
+        public Collection<ConnectorTaskId> tasks() {
+            return tasks;
+        }
+
+        public int connectorsSize() {
+            return connectors.size();
+        }
+
+        public int tasksSize() {
+            return tasks.size();
+        }
+
+        public void assign(String connector) {
+            connectors.add(connector);
+        }
+
+        public void assign(ConnectorTaskId task) {
+            tasks.add(task);
+        }
+
+        public int size() {
+            return connectors.size() + tasks.size();
+        }
+
+        public boolean isEmpty() {
+            return connectors.isEmpty() && tasks.isEmpty();
+        }
+
+        public static Comparator<WorkerLoad> connectorComparator() {
+            return (left, right) -> {
+                int res = left.connectors.size() - right.connectors.size();
+                return res != 0 ? res : left.worker == null
+                                        ? right.worker == null ? 0 : -1
+                                        : left.worker.compareTo(right.worker);
+            };
+        }
+
+        public static Comparator<WorkerLoad> taskComparator() {
+            return (left, right) -> {
+                int res = left.tasks.size() - right.tasks.size();
+                return res != 0 ? res : left.worker == null
+                                        ? right.worker == null ? 0 : -1
+                                        : left.worker.compareTo(right.worker);
+            };
+        }
+
+        @Override
+        public String toString() {
+            return "{ worker=" + worker + ", connectorIds=" + connectors + ", taskIds=" + tasks + '}';
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof WorkerLoad)) {
+                return false;
+            }
+            WorkerLoad that = (WorkerLoad) o;
+            return worker.equals(that.worker) &&
+                    connectors.equals(that.connectors) &&
+                    tasks.equals(that.tasks);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(worker, connectors, tasks);
+        }
+    }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 5d18915..99ea3a4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -45,7 +45,6 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -55,8 +54,6 @@ import java.util.concurrent.atomic.AtomicReference;
  * higher level operations in response to group membership events being handled by the herder.
  */
 public class WorkerGroupMember {
-
-    private static final AtomicInteger CONNECT_CLIENT_ID_SEQUENCE = new AtomicInteger(1);
     private static final String JMX_PREFIX = "kafka.connect";
 
     private final Logger log;
@@ -74,17 +71,16 @@ public class WorkerGroupMember {
                              String restUrl,
                              ConfigBackingStore configStorage,
                              WorkerRebalanceListener listener,
-                             Time time) {
+                             Time time,
+                             String clientId,
+                             LogContext logContext) {
         try {
             this.time = time;
+            this.clientId = clientId;
+            this.log = logContext.logger(WorkerGroupMember.class);
 
-            String clientIdConfig = config.getString(CommonClientConfigs.CLIENT_ID_CONFIG);
-            clientId = clientIdConfig.length() <= 0 ? "connect-" + CONNECT_CLIENT_ID_SEQUENCE.getAndIncrement() : clientIdConfig;
             String groupId = config.getString(DistributedConfig.GROUP_ID_CONFIG);
 
-            LogContext logContext = new LogContext("[Worker clientId=" + clientId + ", groupId=" + groupId + "] ");
-            this.log = logContext.logger(WorkerGroupMember.class);
-
             Map<String, String> metricsTags = new LinkedHashMap<>();
             metricsTags.put("client-id", clientId);
             MetricConfig metricConfig = new MetricConfig().samples(config.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
@@ -140,7 +136,9 @@ public class WorkerGroupMember {
                     retryBackoffMs,
                     restUrl,
                     configStorage,
-                    listener);
+                    listener,
+                    ConnectProtocolCompatibility.compatibility(config.getString(DistributedConfig.CONNECT_PROTOCOL_CONFIG)),
+                    config.getInt(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG));
 
             AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());
             log.debug("Connect group member created");
@@ -202,6 +200,15 @@ public class WorkerGroupMember {
         return coordinator.ownerUrl(task);
     }
 
+    /**
+     * Get the version of the connect protocol that is currently active in the group of workers.
+     *
+     * @return the current connect protocol version
+     */
+    public short currentProtocolVersion() {
+        return coordinator.currentProtocolVersion();
+    }
+
     private void stop(boolean swallowException) {
         log.trace("Stopping the Connect group member.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
index 6ff5ce4..93d0327 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerRebalanceListener.java
@@ -25,13 +25,15 @@ import java.util.Collection;
  */
 public interface WorkerRebalanceListener {
     /**
-     * Invoked when a new assignment is created by joining the Connect worker group. This is invoked for both successful
-     * and unsuccessful assignments.
+     * Invoked when a new assignment is created by joining the Connect worker group. This is
+     * invoked for both successful and unsuccessful assignments.
      */
-    void onAssigned(ConnectProtocol.Assignment assignment, int generation);
+    void onAssigned(ExtendedAssignment assignment, int generation);
 
     /**
-     * Invoked when a rebalance operation starts, revoking ownership for the set of connectors and tasks.
+     * Invoked when a rebalance operation starts, revoking ownership for the set of connectors
+     * and tasks. Depending on the Connect protocol version, the collection of revoked connectors
+     * or tasks might refer to all or some of the connectors and tasks running on the worker.
      */
     void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks);
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
new file mode 100644
index 0000000..b0125b2
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONNECT_PROTOCOL_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+
+/**
+ * Integration tests for incremental cooperative rebalancing between Connect workers
+ */
+@Category(IntegrationTest.class)
+public class RebalanceSourceConnectorsIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(RebalanceSourceConnectorsIntegrationTest.class);
+
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final int CONNECTOR_SETUP_DURATION_MS = 30_000;
+    private static final int WORKER_SETUP_DURATION_MS = 30_000;
+    private static final int NUM_TASKS = 4;
+    private static final String CONNECTOR_NAME = "seq-source1";
+    private static final String TOPIC_NAME = "sequential-topic";
+
+    private EmbeddedConnectCluster connect;
+
+    @Before
+    public void setup() throws IOException {
+        // setup Connect worker properties
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000");
+
+        // setup Kafka broker properties
+        Properties brokerProps = new Properties();
+        brokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kafka and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("connect-cluster")
+                .numWorkers(3)
+                .numBrokers(1)
+                .workerProps(workerProps)
+                .brokerProps(brokerProps)
+                .build();
+
+        // start the clusters
+        connect.start();
+    }
+
+    @After
+    public void close() {
+        // stop all Connect, Kafka and Zk threads.
+        connect.stop();
+    }
+
+    @Test
+    public void testStartTwoConnectors() throws Exception {
+        // create test topic
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        // start a source connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        // start a source connector
+        connect.configureConnector("another-source", props);
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning("another-source", 4).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+    }
+
+    @Test
+    public void testDeleteConnector() throws Exception {
+        // create test topic
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        waitForCondition(() -> this.assertWorkersUp(3),
+                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+
+        // start a source connector
+        IntStream.range(0, 4).forEachOrdered(
+            i -> {
+                try {
+                    connect.configureConnector(CONNECTOR_NAME + i, props);
+                } catch (IOException e) {
+                    throw new ConnectException(e);
+                }
+            });
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(true),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        // delete connector
+        connect.deleteConnector(CONNECTOR_NAME + 3);
+
+        waitForCondition(() -> !this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(true),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not stop in time.");
+
+        waitForCondition(this::assertConnectorAndTasksAreUnique,
+                WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
+    }
+
+    @Test
+    public void testAddingWorker() throws Exception {
+        // create test topic
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        waitForCondition(() -> this.assertWorkersUp(3),
+                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+
+        // start a source connector
+        IntStream.range(0, 4).forEachOrdered(
+            i -> {
+                try {
+                    connect.configureConnector(CONNECTOR_NAME + i, props);
+                } catch (IOException e) {
+                    throw new ConnectException(e);
+                }
+            });
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        connect.addWorker();
+
+        waitForCondition(() -> this.assertWorkersUp(4),
+                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        waitForCondition(this::assertConnectorAndTasksAreUnique,
+                WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
+    }
+
+    @Test
+    public void testRemovingWorker() throws Exception {
+        // create test topic
+        connect.kafka().createTopic(TOPIC_NAME, NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the source connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getSimpleName());
+        props.put(TASKS_MAX_CONFIG, String.valueOf(NUM_TASKS));
+        props.put("throughput", String.valueOf(1));
+        props.put("messages.per.poll", String.valueOf(10));
+        props.put(TOPICS_CONFIG, TOPIC_NAME);
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        waitForCondition(() -> this.assertWorkersUp(3),
+                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+
+        // start a source connector
+        IntStream.range(0, 4).forEachOrdered(
+            i -> {
+                try {
+                    connect.configureConnector(CONNECTOR_NAME + i, props);
+                } catch (IOException e) {
+                    throw new ConnectException(e);
+                }
+            });
+
+        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+
+        connect.removeWorker();
+
+        waitForCondition(() -> this.assertWorkersUp(2),
+                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+
+        waitForCondition(this::assertConnectorAndTasksAreUnique,
+                WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
+    }
+
+    /**
+     * Confirm that a connector with an exact number of tasks is running.
+     *
+     * @param connectorName the connector
+     * @param numTasks the expected number of tasks
+     * @return true if the connector and tasks are in RUNNING state; false otherwise
+     */
+    private Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
+        try {
+            ConnectorStateInfo info = connect.connectorStatus(connectorName);
+            boolean result = info != null
+                    && info.tasks().size() == numTasks
+                    && info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                    && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+            return Optional.of(result);
+        } catch (Exception e) {
+            log.error("Could not check connector state info.", e);
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Verifies whether the supplied number of workers matches the number of workers
+     * currently running.
+     * @param numWorkers the expected number of active workers
+     * @return true if exactly numWorkers are active; false if more or fewer workers are running
+     */
+    private boolean assertWorkersUp(int numWorkers) {
+        try {
+            int numUp = connect.activeWorkers().size();
+            return numUp == numWorkers;
+        } catch (Exception e) {
+            log.error("Could not check active workers.", e);
+            return false;
+        }
+    }
+
+    private boolean assertConnectorAndTasksAreUnique() {
+        try {
+            Map<String, Collection<String>> connectors = new HashMap<>();
+            Map<String, Collection<String>> tasks = new HashMap<>();
+            for (String connector : connect.connectors()) {
+                ConnectorStateInfo info = connect.connectorStatus(connector);
+                connectors.computeIfAbsent(info.connector().workerId(), k -> new ArrayList<>())
+                        .add(connector);
+                info.tasks().forEach(
+                    t -> tasks.computeIfAbsent(t.workerId(), k -> new ArrayList<>())
+                           .add(connector + "-" + t.id()));
+            }
+
+            int maxConnectors = connectors.values().stream().mapToInt(Collection::size).max().orElse(0);
+            int maxTasks = tasks.values().stream().mapToInt(Collection::size).max().orElse(0);
+
+            assertNotEquals("Found no connectors running!", maxConnectors, 0);
+            assertNotEquals("Found no tasks running!", maxTasks, 0);
+            assertEquals("Connector assignments are not unique: " + connectors,
+                    connectors.values().size(),
+                    connectors.values().stream().distinct().collect(Collectors.toList()).size());
+            assertEquals("Task assignments are not unique: " + tasks,
+                    tasks.values().size(),
+                    tasks.values().stream().distinct().collect(Collectors.toList()).size());
+            return true;
+        } catch (Exception e) {
+            log.error("Could not check connector state info.", e);
+            return false;
+        }
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
new file mode 100644
index 0000000..a790574
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTestUtils.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.ExtendedAssignment;
+import org.apache.kafka.connect.runtime.distributed.ExtendedWorkerState;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class WorkerTestUtils {
+
+    public static WorkerLoad emptyWorkerLoad(String worker) {
+        return new WorkerLoad.Builder(worker).build();
+    }
+
+    public WorkerLoad workerLoad(String worker, int connectorStart, int connectorNum,
+                                  int taskStart, int taskNum) {
+        return new WorkerLoad.Builder(worker).with(
+                newConnectors(connectorStart, connectorStart + connectorNum),
+                newTasks(taskStart, taskStart + taskNum)).build();
+    }
+
+    public static List<String> newConnectors(int start, int end) {
+        return IntStream.range(start, end)
+                .mapToObj(i -> "connector" + i)
+                .collect(Collectors.toList());
+    }
+
+    public static List<ConnectorTaskId> newTasks(int start, int end) {
+        return IntStream.range(start, end)
+                .mapToObj(i -> new ConnectorTaskId("task", i))
+                .collect(Collectors.toList());
+    }
+
+    public static ClusterConfigState clusterConfigState(long offset,
+                                                        int connectorNum,
+                                                        int taskNum) {
+        return new ClusterConfigState(
+                offset,
+                connectorTaskCounts(1, connectorNum, taskNum),
+                connectorConfigs(1, connectorNum),
+                connectorTargetStates(1, connectorNum, TargetState.STARTED),
+                taskConfigs(0, connectorNum, connectorNum * taskNum),
+                Collections.emptySet());
+    }
+
+    public static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
+                                                                 long givenOffset,
+                                                                 Map<String, ExtendedAssignment> givenAssignments) {
+        return givenAssignments.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue())));
+    }
+
+    public static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
+                                                                 long givenOffset,
+                                                                 int start,
+                                                                 int connectorNum) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("worker" + i, new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null)))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    public static Map<String, Integer> connectorTaskCounts(int start,
+                                                           int connectorNum,
+                                                           int taskCounts) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("connector" + i, taskCounts))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    public static Map<String, Map<String, String>> connectorConfigs(int start, int connectorNum) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("connector" + i, new HashMap<String, String>()))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    public static Map<String, TargetState> connectorTargetStates(int start,
+                                                                 int connectorNum,
+                                                                 TargetState state) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("connector" + i, state))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    public static Map<ConnectorTaskId, Map<String, String>> taskConfigs(int start,
+                                                                        int connectorNum,
+                                                                        int taskNum) {
+        return IntStream.range(start, taskNum + 1)
+                .mapToObj(i -> new SimpleEntry<>(
+                        new ConnectorTaskId("connector" + i / connectorNum + 1, i),
+                        new HashMap<String, String>())
+                ).collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    public static String expectedLeaderUrl(String givenLeader) {
+        return "http://" + givenLeader + ":8083";
+    }
+
+    public static void assertAssignment(String expectedLeader,
+                                        long expectedOffset,
+                                        List<String> expectedAssignedConnectors,
+                                        int expectedAssignedTaskNum,
+                                        List<String> expectedRevokedConnectors,
+                                        int expectedRevokedTaskNum,
+                                        ExtendedAssignment assignment) {
+        assertAssignment(false, expectedLeader, expectedOffset,
+                expectedAssignedConnectors, expectedAssignedTaskNum,
+                expectedRevokedConnectors, expectedRevokedTaskNum,
+                0,
+                assignment);
+    }
+
+    public static void assertAssignment(String expectedLeader,
+                                        long expectedOffset,
+                                        List<String> expectedAssignedConnectors,
+                                        int expectedAssignedTaskNum,
+                                        List<String> expectedRevokedConnectors,
+                                        int expectedRevokedTaskNum,
+                                        int expectedDelay,
+                                        ExtendedAssignment assignment) {
+        assertAssignment(false, expectedLeader, expectedOffset,
+                expectedAssignedConnectors, expectedAssignedTaskNum,
+                expectedRevokedConnectors, expectedRevokedTaskNum,
+                expectedDelay,
+                assignment);
+    }
+
+    public static void assertAssignment(boolean expectFailed,
+                                        String expectedLeader,
+                                        long expectedOffset,
+                                        List<String> expectedAssignedConnectors,
+                                        int expectedAssignedTaskNum,
+                                        List<String> expectedRevokedConnectors,
+                                        int expectedRevokedTaskNum,
+                                        int expectedDelay,
+                                        ExtendedAssignment assignment) {
+        assertNotNull("Assignment can't be null", assignment);
+
+        assertEquals("Wrong status in " + assignment, expectFailed, assignment.failed());
+
+        assertEquals("Wrong leader in " + assignment, expectedLeader, assignment.leader());
+
+        assertEquals("Wrong leaderUrl in " + assignment, expectedLeaderUrl(expectedLeader),
+                assignment.leaderUrl());
+
+        assertEquals("Wrong offset in " + assignment, expectedOffset, assignment.offset());
+
+        assertThat("Wrong set of assigned connectors in " + assignment,
+                assignment.connectors(), is(expectedAssignedConnectors));
+
+        assertEquals("Wrong number of assigned tasks in " + assignment,
+                expectedAssignedTaskNum, assignment.tasks().size());
+
+        assertThat("Wrong set of revoked connectors in " + assignment,
+                assignment.revokedConnectors(), is(expectedRevokedConnectors));
+
+        assertEquals("Wrong number of revoked tasks in " + assignment,
+                expectedRevokedTaskNum, assignment.revokedTasks().size());
+
+        assertEquals("Wrong rebalance delay in " + assignment, expectedDelay, assignment.delay());
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
new file mode 100644
index 0000000..24e7cb9
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/ConnectProtocolCompatibilityTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class ConnectProtocolCompatibilityTest {
+    private static final String LEADER_URL = "leaderUrl:8083";
+
+    private String connectorId1 = "connector1";
+    private String connectorId2 = "connector2";
+    private String connectorId3 = "connector3";
+    private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
+    private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1);
+    private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
+    private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
+
+    @Rule
+    public MockitoRule rule = MockitoJUnit.rule();
+
+    @Mock
+    private KafkaConfigBackingStore configStorage;
+    private ClusterConfigState configState;
+
+    @Before
+    public void setup() {
+        configStorage = mock(KafkaConfigBackingStore.class);
+        configState = new ClusterConfigState(
+                1L,
+                Collections.singletonMap(connectorId1, 1),
+                Collections.singletonMap(connectorId1, new HashMap<>()),
+                Collections.singletonMap(connectorId1, TargetState.STARTED),
+                Collections.singletonMap(taskId1x0, new HashMap<>()),
+                Collections.emptySet());
+    }
+
+    @After
+    public void teardown() {
+        verifyNoMoreInteractions(configStorage);
+    }
+
+    @Test
+    public void testEagerToEagerMetadata() {
+        when(configStorage.snapshot()).thenReturn(configState);
+        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
+        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
+        assertEquals(LEADER_URL, state.url());
+        assertEquals(1, state.offset());
+        verify(configStorage).snapshot();
+    }
+
+    @Test
+    public void testCoopToCoopMetadata() {
+        when(configStorage.snapshot()).thenReturn(configState);
+        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState);
+        ExtendedWorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
+        assertEquals(LEADER_URL, state.url());
+        assertEquals(1, state.offset());
+        verify(configStorage).snapshot();
+    }
+
+    @Test
+    public void testCoopToEagerMetadata() {
+        when(configStorage.snapshot()).thenReturn(configState);
+        ExtendedWorkerState workerState = new ExtendedWorkerState(LEADER_URL, configStorage.snapshot().offset(), null);
+        ByteBuffer metadata = IncrementalCooperativeConnectProtocol.serializeMetadata(workerState);
+        ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(metadata);
+        assertEquals(LEADER_URL, state.url());
+        assertEquals(1, state.offset());
+        verify(configStorage).snapshot();
+    }
+
+    @Test
+    public void testEagerToCoopMetadata() {
+        when(configStorage.snapshot()).thenReturn(configState);
+        ConnectProtocol.WorkerState workerState = new ConnectProtocol.WorkerState(LEADER_URL, configStorage.snapshot().offset());
+        ByteBuffer metadata = ConnectProtocol.serializeMetadata(workerState);
+        ConnectProtocol.WorkerState state = IncrementalCooperativeConnectProtocol.deserializeMetadata(metadata);
+        assertEquals(LEADER_URL, state.url());
+        assertEquals(1, state.offset());
+        verify(configStorage).snapshot();
+    }
+
+    @Test
+    public void testEagerToEagerAssignment() {
+        ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
+                ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
+                Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0));
+
+        ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment);
+        ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Arrays.asList(connectorId1, connectorId3), leaderAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId2x0), leaderAssignment.tasks());
+
+        ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment(
+                ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
+                Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0));
+
+        ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2);
+        ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(memberBuf);
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("member", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
+    }
+
+    @Test
+    public void testCoopToCoopAssignment() {
+        ExtendedAssignment assignment = new ExtendedAssignment(
+                CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
+                Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0),
+                Collections.emptyList(), Collections.emptyList(), 0);
+
+        ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Arrays.asList(connectorId1, connectorId3), leaderAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId2x0), leaderAssignment.tasks());
+
+        ExtendedAssignment assignment2 = new ExtendedAssignment(
+                CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
+                Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0),
+                Collections.emptyList(), Collections.emptyList(), 0);
+
+        ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2);
+        ConnectProtocol.Assignment memberAssignment =
+                IncrementalCooperativeConnectProtocol.deserializeAssignment(memberBuf);
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("member", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
+    }
+
+    @Test
+    public void testEagerToCoopAssignment() {
+        ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
+                ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
+                Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0));
+
+        ByteBuffer leaderBuf = ConnectProtocol.serializeAssignment(assignment);
+        ConnectProtocol.Assignment leaderAssignment =
+                IncrementalCooperativeConnectProtocol.deserializeAssignment(leaderBuf);
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Arrays.asList(connectorId1, connectorId3), leaderAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId2x0), leaderAssignment.tasks());
+
+        ConnectProtocol.Assignment assignment2 = new ConnectProtocol.Assignment(
+                ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
+                Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0));
+
+        ByteBuffer memberBuf = ConnectProtocol.serializeAssignment(assignment2);
+        ConnectProtocol.Assignment memberAssignment =
+                IncrementalCooperativeConnectProtocol.deserializeAssignment(memberBuf);
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("member", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
+    }
+
+    @Test
+    public void testCoopToEagerAssignment() {
+        ExtendedAssignment assignment = new ExtendedAssignment(
+                CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "leader", LEADER_URL, 1L,
+                Arrays.asList(connectorId1, connectorId3), Arrays.asList(taskId2x0),
+                Collections.emptyList(), Collections.emptyList(), 0);
+
+        ByteBuffer leaderBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(leaderBuf);
+        assertEquals(false, leaderAssignment.failed());
+        assertEquals("leader", leaderAssignment.leader());
+        assertEquals(1, leaderAssignment.offset());
+        assertEquals(Arrays.asList(connectorId1, connectorId3), leaderAssignment.connectors());
+        assertEquals(Collections.singletonList(taskId2x0), leaderAssignment.tasks());
+
+        ExtendedAssignment assignment2 = new ExtendedAssignment(
+                CONNECT_PROTOCOL_V1, ConnectProtocol.Assignment.NO_ERROR, "member", LEADER_URL, 1L,
+                Arrays.asList(connectorId2), Arrays.asList(taskId1x0, taskId3x0),
+                Collections.emptyList(), Collections.emptyList(), 0);
+
+        ByteBuffer memberBuf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment2);
+        ConnectProtocol.Assignment memberAssignment = ConnectProtocol.deserializeAssignment(memberBuf);
+        assertEquals(false, memberAssignment.failed());
+        assertEquals("member", memberAssignment.leader());
+        assertEquals(1, memberAssignment.offset());
+        assertEquals(Collections.singletonList(connectorId2), memberAssignment.connectors());
+        assertEquals(Arrays.asList(taskId1x0, taskId3x0), memberAssignment.tasks());
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index fcac1fb..b381263 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -76,6 +76,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static java.util.Collections.singletonList;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -174,6 +176,7 @@ public class DistributedHerderTest {
 
     private SinkConnectorConfig conn1SinkConfig;
     private SinkConnectorConfig conn1SinkConfigUpdated;
+    private short connectProtocolVersion;
 
     @Before
     public void setUp() throws Exception {
@@ -182,13 +185,16 @@ public class DistributedHerderTest {
         worker = PowerMock.createMock(Worker.class);
         EasyMock.expect(worker.isSinkConnector(CONN1)).andStubReturn(Boolean.TRUE);
 
+        // Default to the old protocol unless specified otherwise
+        connectProtocolVersion = CONNECT_PROTOCOL_V0;
+
         herder = PowerMock.createPartialMock(DistributedHerder.class,
                 new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time);
 
         configUpdateListener = herder.new ConfigUpdateListener();
-        rebalanceListener = herder.new RebalanceListener();
+        rebalanceListener = herder.new RebalanceListener(time);
         plugins = PowerMock.createMock(Plugins.class);
         conn1SinkConfig = new SinkConnectorConfig(plugins, CONN1_CONFIG);
         conn1SinkConfigUpdated = new SinkConnectorConfig(plugins, CONN1_CONFIG_UPDATED);
@@ -208,6 +214,7 @@ public class DistributedHerderTest {
     public void testJoinAssignment() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
@@ -236,6 +243,7 @@ public class DistributedHerderTest {
     public void testRebalance() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
@@ -280,9 +288,156 @@ public class DistributedHerderTest {
     }
 
     @Test
+    public void testIncrementalCooperativeRebalanceForNewMember() throws Exception {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        // Join group. First rebalance contains revocations from other members. For the new
+        // member the assignment should be empty
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // The new member got its assignment
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        time.sleep(1000L);
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        time.sleep(3000L);
+        assertStatistics(3, 2, 100, 3000);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testIncrementalCooperativeRebalanceForExistingMember() throws Exception {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        // Join group. First rebalance contains revocations because a new member joined.
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(Arrays.asList(CONN1), Arrays.asList(TASK1),
+                ConnectProtocol.Assignment.NO_ERROR, 1,
+                Collections.emptyList(), Collections.emptyList(), 0);
+        member.requestRejoin();
+        PowerMock.expectLastCall();
+
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        // In the second rebalance the new member gets its assignment and this member has no
+        // assignments or revocations
+        expectRebalance(1, Collections.emptyList(), Collections.emptyList());
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        time.sleep(1000L);
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        time.sleep(3000L);
+        assertStatistics(3, 2, 100, 3000);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testIncrementalCooperativeRebalanceWithDelay() throws Exception {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+        // Join group. First rebalance contains some assignments but also a delay, because a
+        // member was detected missing
+        int rebalanceDelay = 10_000;
+
+        EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR, 1,
+                Collections.emptyList(), Arrays.asList(TASK2),
+                rebalanceDelay);
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        worker.startTask(EasyMock.eq(TASK2), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall().andAnswer(() -> {
+            time.sleep(9900L);
+            return null;
+        });
+
+        // Request to re-join because the scheduled rebalance delay has been reached
+        member.requestRejoin();
+        PowerMock.expectLastCall();
+
+        // The member got its assignment and revocation
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        time.sleep(1000L);
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 2, 100, 2000);
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
     public void testRebalanceFailedConnector() throws Exception {
         // Join group and get assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
@@ -351,6 +506,7 @@ public class DistributedHerderTest {
     @Test
     public void testCreateConnector() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
 
@@ -394,6 +550,7 @@ public class DistributedHerderTest {
     @Test
     public void testCreateConnectorFailedBasicValidation() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
 
@@ -445,6 +602,7 @@ public class DistributedHerderTest {
     @Test
     public void testCreateConnectorFailedCustomValidation() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
 
@@ -497,6 +655,7 @@ public class DistributedHerderTest {
     @Test
     public void testConnectorNameConflictsWithWorkerGroupId() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
 
@@ -546,6 +705,7 @@ public class DistributedHerderTest {
     @Test
     public void testCreateConnectorAlreadyExists() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
         final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
@@ -577,6 +737,7 @@ public class DistributedHerderTest {
     @Test
     public void testDestroyConnector() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         // Start with one connector
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
@@ -615,6 +776,7 @@ public class DistributedHerderTest {
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         expectRebalance(1, singletonList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
@@ -656,6 +818,7 @@ public class DistributedHerderTest {
     public void testRestartUnknownConnector() throws Exception {
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -690,6 +853,7 @@ public class DistributedHerderTest {
     public void testRestartConnectorRedirectToLeader() throws Exception {
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -724,6 +888,7 @@ public class DistributedHerderTest {
     public void testRestartConnectorRedirectToOwner() throws Exception {
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -771,6 +936,7 @@ public class DistributedHerderTest {
 
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), singletonList(TASK0));
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -808,6 +974,7 @@ public class DistributedHerderTest {
     public void testRestartUnknownTask() throws Exception {
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -854,6 +1021,7 @@ public class DistributedHerderTest {
     public void testRestartTaskRedirectToLeader() throws Exception {
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -888,6 +1056,7 @@ public class DistributedHerderTest {
     public void testRestartTaskRedirectToOwner() throws Exception {
         // get the initial assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
         member.poll(EasyMock.anyInt());
@@ -926,6 +1095,7 @@ public class DistributedHerderTest {
     public void testConnectorConfigAdded() {
         // If a connector was added, we need to rebalance
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
 
         // join, no configs so no need to catch up on config topic
         expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
@@ -943,6 +1113,7 @@ public class DistributedHerderTest {
         // Performs rebalance and gets new assignment
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.NO_ERROR, 1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -967,11 +1138,13 @@ public class DistributedHerderTest {
         // Connector config can be applied without any rebalance
 
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
 
         // join
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -988,6 +1161,7 @@ public class DistributedHerderTest {
         EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT); // for this test, it doesn't matter if we use the same config snapshot
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -1011,11 +1185,13 @@ public class DistributedHerderTest {
         // ensure that target state changes are propagated to the worker
 
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
 
         // join
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -1051,11 +1227,13 @@ public class DistributedHerderTest {
     @Test
     public void testConnectorResumed() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
 
         // start with the connector paused
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT_PAUSED_CONN1);
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.PAUSED));
         PowerMock.expectLastCall().andReturn(true);
@@ -1094,6 +1272,7 @@ public class DistributedHerderTest {
     @Test
     public void testUnknownConnectorPaused() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.singleton(CONN1));
 
         // join
@@ -1131,6 +1310,7 @@ public class DistributedHerderTest {
         // changes to the worker so that tasks will transition correctly
 
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
 
         // join
@@ -1171,6 +1351,7 @@ public class DistributedHerderTest {
         // changes to the worker so that tasks will transition correctly
 
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.connectorNames()).andStubReturn(Collections.<String>emptySet());
 
         // join
@@ -1211,6 +1392,7 @@ public class DistributedHerderTest {
     public void testTaskConfigAdded() {
         // Task config always requires rebalance
         EasyMock.expect(member.memberId()).andStubReturn("member");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
 
         // join
         expectRebalance(-1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
@@ -1249,6 +1431,7 @@ public class DistributedHerderTest {
     public void testJoinLeaderCatchUpFails() throws Exception {
         // Join group and as leader fail to do assignment
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
                 ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
                 Collections.<ConnectorTaskId>emptyList());
@@ -1264,6 +1447,7 @@ public class DistributedHerderTest {
         expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
         expectPostRebalanceCatchup(SNAPSHOT);
 
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -1293,6 +1477,7 @@ public class DistributedHerderTest {
     @Test
     public void testAccessors() throws Exception {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         expectRebalance(1, Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList());
         EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT).times(2);
@@ -1315,6 +1500,7 @@ public class DistributedHerderTest {
         PowerMock.expectLastCall();
 
 
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         PowerMock.replayAll();
 
         FutureCallback<Collection<String>> listConnectorsCb = new FutureCallback<>();
@@ -1350,6 +1536,7 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         expectRebalance(1, Arrays.asList(CONN1), Collections.<ConnectorTaskId>emptyList());
         expectPostRebalanceCatchup(SNAPSHOT);
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -1393,6 +1580,7 @@ public class DistributedHerderTest {
         EasyMock.expect(configBackingStore.snapshot()).andReturn(SNAPSHOT_UPDATED_CONN1_CONFIG).times(2);
         worker.stopConnector(CONN1);
         PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         worker.startConnector(EasyMock.eq(CONN1), EasyMock.<Map<String, String>>anyObject(), EasyMock.<ConnectorContext>anyObject(),
                 EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
         PowerMock.expectLastCall().andReturn(true);
@@ -1441,10 +1629,12 @@ public class DistributedHerderTest {
         // This requires inter-worker communication, so needs the REST API
     }
 
+
     private void expectRebalance(final long offset,
                                  final List<String> assignedConnectors,
                                  final List<ConnectorTaskId> assignedTasks) {
-        expectRebalance(null, null, ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks);
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR, offset, assignedConnectors, assignedTasks, 0);
     }
 
     // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
@@ -1454,33 +1644,56 @@ public class DistributedHerderTest {
                                  final long offset,
                                  final List<String> assignedConnectors,
                                  final List<ConnectorTaskId> assignedTasks) {
+        expectRebalance(revokedConnectors, revokedTasks, error, offset, assignedConnectors, assignedTasks, 0);
+    }
+
+    // Handles common initial part of rebalance callback. Does not handle instantiation of connectors and tasks.
+    private void expectRebalance(final Collection<String> revokedConnectors,
+                                 final List<ConnectorTaskId> revokedTasks,
+                                 final short error,
+                                 final long offset,
+                                 final List<String> assignedConnectors,
+                                 final List<ConnectorTaskId> assignedTasks,
+                                 int delay) {
         member.ensureActive();
         PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
             @Override
             public Object answer() throws Throwable {
-                if (revokedConnectors != null)
+                ExtendedAssignment assignment;
+                if (!revokedConnectors.isEmpty() || !revokedTasks.isEmpty()) {
                     rebalanceListener.onRevoked("leader", revokedConnectors, revokedTasks);
-                ConnectProtocol.Assignment assignment = new ConnectProtocol.Assignment(
-                        error, "leader", "leaderUrl", offset, assignedConnectors, assignedTasks);
+                }
+
+                if (connectProtocolVersion == CONNECT_PROTOCOL_V0) {
+                    assignment = new ExtendedAssignment(
+                            connectProtocolVersion, error, "leader", "leaderUrl", offset,
+                            assignedConnectors, assignedTasks,
+                            Collections.emptyList(), Collections.emptyList(), 0);
+                } else {
+                    assignment = new ExtendedAssignment(
+                            connectProtocolVersion, error, "leader", "leaderUrl", offset,
+                            assignedConnectors, assignedTasks,
+                            new ArrayList<>(revokedConnectors), new ArrayList<>(revokedTasks), delay);
+                }
                 rebalanceListener.onAssigned(assignment, 3);
                 time.sleep(100L);
                 return null;
             }
         });
 
-        if (revokedConnectors != null) {
+        if (!revokedConnectors.isEmpty()) {
             for (String connector : revokedConnectors) {
                 worker.stopConnector(connector);
                 PowerMock.expectLastCall().andReturn(true);
             }
         }
 
-        if (revokedTasks != null && !revokedTasks.isEmpty()) {
+        if (!revokedTasks.isEmpty()) {
             worker.stopAndAwaitTask(EasyMock.anyObject(ConnectorTaskId.class));
             PowerMock.expectLastCall();
         }
 
-        if (revokedConnectors != null) {
+        if (!revokedConnectors.isEmpty()) {
             statusBackingStore.flush();
             PowerMock.expectLastCall();
         }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
new file mode 100644
index 0000000..71ccefd
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignorTest.java
@@ -0,0 +1,1079 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.consumer.internals.RequestFuture;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.runtime.TargetState;
+import org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.ConnectorsAndTasks;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.util.AbstractMap.SimpleEntry;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.apache.kafka.connect.runtime.distributed.WorkerCoordinator.WorkerLoad;
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+public class IncrementalCooperativeAssignorTest {
+    @Rule
+    public MockitoRule rule = MockitoJUnit.rule();
+
+    @Mock
+    private WorkerCoordinator coordinator;
+
+    @Captor
+    ArgumentCaptor<Map<String, ExtendedAssignment>> assignmentsCapture;
+
+    private ClusterConfigState configState;
+    private Map<String, ExtendedWorkerState> memberConfigs;
+    private Map<String, ExtendedWorkerState> expectedMemberConfigs;
+    private long offset;
+    private String leader;
+    private String leaderUrl;
+    private Time time;
+    private int rebalanceDelay;
+    private IncrementalCooperativeAssignor assignor;
+    private int rebalanceNum;
+    Map<String, ExtendedAssignment> assignments;
+    Map<String, ExtendedAssignment> returnedAssignments;
+
+    @Before
+    public void setup() {
+        leader = "worker1";
+        leaderUrl = expectedLeaderUrl(leader);
+        offset = 10;
+        configState = clusterConfigState(offset, 2, 4);
+        memberConfigs = memberConfigs(leader, offset, 1, 1);
+        time = Time.SYSTEM;
+        rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
+        assignments = new HashMap<>();
+        initAssignor();
+    }
+
+    @After
+    public void teardown() {
+        verifyNoMoreInteractions(coordinator);
+    }
+
+    public void initAssignor() {
+        assignor = Mockito.spy(new IncrementalCooperativeAssignor(
+                new LogContext(),
+                time,
+                rebalanceDelay));
+    }
+
+    @Test
+    public void testTaskAssignmentWhenWorkerJoins() {
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1");
+
+        // Second assignment with a second worker joining and all connectors running on previous worker
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Third assignment after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(1, 4, 0, 0, "worker1", "worker2");
+
+        // A fourth rebalance should not change assignments
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testTaskAssignmentWhenWorkerLeavesPermanently() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+
+        // Second assignment with only one worker remaining in the group. The worker that left the
+        // group was a follower. No re-assignments take place immediately and the count
+        // down for the rebalance delay starts
+        applyAssignments(returnedAssignments);
+        assignments.remove("worker2");
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(rebalanceDelay, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1");
+
+        time.sleep(rebalanceDelay / 2);
+
+        // Third (incidental) assignment with still only one worker in the group. Max delay has not
+        // been reached yet
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(rebalanceDelay / 2, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1");
+
+        time.sleep(rebalanceDelay / 2 + 1);
+
+        // Fourth assignment after delay expired
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(1, 4, 0, 0, "worker1");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testTaskAssignmentWhenWorkerBounces() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+
+        // Second assignment with only one worker remaining in the group. The worker that left the
+        // group was a follower. No re-assignments take place immediately and the count
+        // down for the rebalance delay starts
+        applyAssignments(returnedAssignments);
+        assignments.remove("worker2");
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(rebalanceDelay, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1");
+
+        time.sleep(rebalanceDelay / 2);
+
+        // Third (incidental) assignment with still only one worker in the group. Max delay has not
+        // been reached yet
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(rebalanceDelay / 2, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1");
+
+        time.sleep(rebalanceDelay / 4);
+
+        // Fourth assignment with the second worker returning before the delay expires
+        // Since the delay is still active, lost assignments are not reassigned yet
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(rebalanceDelay / 4, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        time.sleep(rebalanceDelay / 4);
+
+        // Fifth assignment with the same two workers. The delay has expired, so the lost
+        // assignments ought to be assigned to the worker that has appeared as returned.
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(1, 4, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testTaskAssignmentWhenLeaderLeavesPermanently() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+
+        // Second assignment with only one worker remaining in the group. The worker that left the
+        // group was the leader. The new leader has no previous assignments and is not tracking a
+        // delay upon a leader's exit
+        applyAssignments(returnedAssignments);
+        assignments.remove("worker1");
+        leader = "worker2";
+        leaderUrl = expectedLeaderUrl(leader);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        // The fact that the leader bounces means that the assignor starts from a clean slate
+        initAssignor();
+
+        // Capture needs to be reset to point to the new assignor
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(1, 4, 0, 0, "worker2");
+
+        // Third (incidental) assignment with still only one worker in the group.
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testTaskAssignmentWhenLeaderBounces() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+
+        // Second assignment with only one worker remaining in the group. The worker that left the
+        // group was the leader. The new leader has no previous assignments and is not tracking a
+        // delay upon a leader's exit
+        applyAssignments(returnedAssignments);
+        assignments.remove("worker1");
+        leader = "worker2";
+        leaderUrl = expectedLeaderUrl(leader);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        // The fact that the leader bounces means that the assignor starts from a clean slate
+        initAssignor();
+
+        // Capture needs to be reset to point to the new assignor
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(1, 4, 0, 0, "worker2");
+
+        // Third assignment with the previous leader returning as a follower. In this case, the
+        // arrival of the previous leader is treated as an arrival of a new worker. Reassignment
+        // happens immediately, first with a revocation
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker1", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        // Fourth assignment after revocations
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(1, 4, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testTaskAssignmentWhenFirstAssignmentAttemptFails() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
+                .when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        try {
+            assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        } catch (RuntimeException e) {
+            RequestFuture.failure(e);
+        }
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        // This was the assignment that should have been sent, but didn't make it after all the way
+        assertDelay(0, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+
+        // Second assignment happens with members returning the same assignments (memberConfigs)
+        // as the first time. The assignor can not tell whether it was the assignment that failed
+        // or the workers that were bounced. Therefore it goes into assignment freeze for
+        // the new assignments for a rebalance delay period
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertDelay(rebalanceDelay, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        time.sleep(rebalanceDelay / 2);
+
+        // Third (incidental) assignment with still only one worker in the group. Max delay has not
+        // been reached yet
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(rebalanceDelay / 2, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 0, 0, "worker1", "worker2");
+
+        time.sleep(rebalanceDelay / 2 + 1);
+
+        // Fourth assignment after delay expired. Finally all the new assignments are assigned
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testTaskAssignmentWhenSubsequentAssignmentAttemptFails() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 2 workers and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(2, 8, 0, 0, "worker1", "worker2");
+
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doThrow(new RuntimeException("Unable to send computed assignment with SyncGroupRequest"))
+                .when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // Second assignment triggered by a third worker joining. The computed assignment should
+        // revoke tasks from the existing group. But the assignment won't be correctly delivered.
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        memberConfigs.put("worker3", new ExtendedWorkerState(leaderUrl, offset, null));
+        try {
+            assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        } catch (RuntimeException e) {
+            RequestFuture.failure(e);
+        }
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        // This was the assignment that should have been sent, but didn't make it after all the way
+        assertDelay(0, returnedAssignments);
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
+
+        // Third assignment happens with members returning the same assignments (memberConfigs)
+        // as the first time.
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertDelay(0, returnedAssignments);
+        assertAssignment(0, 0, 0, 2, "worker1", "worker2", "worker3");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testTaskAssignmentWhenConnectorsAreDeleted() {
+        configState = clusterConfigState(offset, 3, 4);
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        doReturn(Collections.EMPTY_MAP).when(assignor).serializeAssignments(assignmentsCapture.capture());
+
+        // First assignment with 1 worker and 2 connectors configured but not yet assigned
+        memberConfigs.put("worker2", new ExtendedWorkerState(leaderUrl, offset, null));
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(3, 12, 0, 0, "worker1", "worker2");
+
+        // Second assignment with an updated config state that reflects removal of a connector
+        configState = clusterConfigState(offset + 1, 2, 4);
+        when(coordinator.configSnapshot()).thenReturn(configState);
+        applyAssignments(returnedAssignments);
+        memberConfigs = memberConfigs(leader, offset, assignments);
+        assignor.performTaskAssignment(leader, offset, memberConfigs, coordinator);
+        ++rebalanceNum;
+        returnedAssignments = assignmentsCapture.getValue();
+        assertDelay(0, returnedAssignments);
+        expectedMemberConfigs = memberConfigs(leader, offset, returnedAssignments);
+        assertAssignment(0, 0, 1, 4, "worker1", "worker2");
+
+        verify(coordinator, times(rebalanceNum)).configSnapshot();
+        verify(coordinator, times(rebalanceNum)).leaderState(any());
+    }
+
+    @Test
+    public void testAssignConnectorsWhenBalanced() {
+        int num = 2;
+        List<WorkerLoad> existingAssignment = IntStream.range(0, 3)
+                .mapToObj(i -> workerLoad("worker" + i, i * num, num, i * num, num))
+                .collect(Collectors.toList());
+
+        List<WorkerLoad> expectedAssignment = existingAssignment.stream()
+                .map(wl -> new WorkerLoad.Builder(wl.worker()).withCopies(wl.connectors(), wl.tasks()).build())
+                .collect(Collectors.toList());
+        expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9"));
+        expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10"));
+        expectedAssignment.get(2).connectors().addAll(Arrays.asList("connector8"));
+
+        List<String> newConnectors = newConnectors(6, 11);
+        assignor.assignConnectors(existingAssignment, newConnectors);
+        assertEquals(expectedAssignment, existingAssignment);
+    }
+
+    @Test
+    public void testAssignTasksWhenBalanced() {
+        int num = 2;
+        List<WorkerLoad> existingAssignment = IntStream.range(0, 3)
+                .mapToObj(i -> workerLoad("worker" + i, i * num, num, i * num, num))
+                .collect(Collectors.toList());
+
+        List<WorkerLoad> expectedAssignment = existingAssignment.stream()
+                .map(wl -> new WorkerLoad.Builder(wl.worker()).withCopies(wl.connectors(), wl.tasks()).build())
+                .collect(Collectors.toList());
+
+        expectedAssignment.get(0).connectors().addAll(Arrays.asList("connector6", "connector9"));
+        expectedAssignment.get(1).connectors().addAll(Arrays.asList("connector7", "connector10"));
+        expectedAssignment.get(2).connectors().addAll(Arrays.asList("connector8"));
+
+        expectedAssignment.get(0).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 6), new ConnectorTaskId("task", 9)));
+        expectedAssignment.get(1).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 7), new ConnectorTaskId("task", 10)));
+        expectedAssignment.get(2).tasks().addAll(Arrays.asList(new ConnectorTaskId("task", 8)));
+
+        List<String> newConnectors = newConnectors(6, 11);
+        assignor.assignConnectors(existingAssignment, newConnectors);
+        List<ConnectorTaskId> newTasks = newTasks(6, 11);
+        assignor.assignTasks(existingAssignment, newTasks);
+        assertEquals(expectedAssignment, existingAssignment);
+    }
+
+    @Test
+    public void testAssignConnectorsWhenImbalanced() {
+        List<WorkerLoad> existingAssignment = new ArrayList<>();
+        existingAssignment.add(workerLoad("worker0", 0, 2, 0, 2));
+        existingAssignment.add(workerLoad("worker1", 2, 3, 2, 3));
+        existingAssignment.add(workerLoad("worker2", 5, 4, 5, 4));
+        existingAssignment.add(emptyWorkerLoad("worker3"));
+
+        List<String> newConnectors = newConnectors(9, 24);
+        List<ConnectorTaskId> newTasks = newTasks(9, 24);
+        assignor.assignConnectors(existingAssignment, newConnectors);
+        assignor.assignTasks(existingAssignment, newTasks);
+        for (WorkerLoad worker : existingAssignment) {
+            assertEquals(6, worker.connectorsSize());
+            assertEquals(6, worker.tasksSize());
+        }
+    }
+
+    @Test
+    public void testLostAssignmentHandlingWhenWorkerBounces() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
+        configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+
+        ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
+
+        // No lost assignments
+        assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
+                newSubmissions,
+                new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        String flakyWorker = "worker1";
+        WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
+
+        ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
+                .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
+
+        // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay / 2);
+        rebalanceDelay /= 2;
+
+        // A new worker (probably returning worker) has joined
+        configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.singleton(flakyWorker),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay);
+
+        // The new worker has still no assignments
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertTrue("Wrong assignment of lost connectors",
+                configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
+                        .connectors()
+                        .containsAll(lostAssignments.connectors()));
+        assertTrue("Wrong assignment of lost tasks",
+                configuredAssignment.getOrDefault(flakyWorker, new WorkerLoad.Builder(flakyWorker).build())
+                        .tasks()
+                        .containsAll(lostAssignments.tasks()));
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+    }
+
+    @Test
+    public void testLostAssignmentHandlingWhenWorkerLeavesPermanently() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
+        configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+
+        ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
+
+        // No lost assignments
+        assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
+                newSubmissions,
+                new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        String removedWorker = "worker1";
+        WorkerLoad lostLoad = workerLoad(removedWorker, 2, 2, 4, 4);
+
+        ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
+                .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
+
+        // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay / 2);
+        rebalanceDelay /= 2;
+
+        // No new worker has joined
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay);
+
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertTrue("Wrong assignment of lost connectors",
+                newSubmissions.connectors().containsAll(lostAssignments.connectors()));
+        assertTrue("Wrong assignment of lost tasks",
+                newSubmissions.tasks().containsAll(lostAssignments.tasks()));
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+    }
+
+    @Test
+    public void testLostAssignmentHandlingWithMoreThanOneCandidates() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
+        configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+
+        ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
+
+        // No lost assignments
+        assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
+                newSubmissions,
+                new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        String flakyWorker = "worker1";
+        WorkerLoad lostLoad = workerLoad(flakyWorker, 2, 2, 4, 4);
+        String newWorker = "worker3";
+
+        ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
+                .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
+
+        // Lost assignments detected - A new worker also has joined that is not the returning worker
+        configuredAssignment.put(newWorker, new WorkerLoad.Builder(newWorker).build());
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.singleton(newWorker),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay / 2);
+        rebalanceDelay /= 2;
+
+        // Now two new workers have joined
+        configuredAssignment.put(flakyWorker, new WorkerLoad.Builder(flakyWorker).build());
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        Set<String> expectedWorkers = new HashSet<>();
+        expectedWorkers.addAll(Arrays.asList(newWorker, flakyWorker));
+        assertThat("Wrong set of workers for reassignments",
+                expectedWorkers,
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay);
+
+        // The new workers have new assignments, other than the lost ones
+        configuredAssignment.put(flakyWorker, workerLoad(flakyWorker, 6, 2, 8, 4));
+        configuredAssignment.put(newWorker, workerLoad(newWorker, 8, 2, 12, 4));
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        // newWorker joined first, so should be picked up first as a candidate for reassignment
+        assertTrue("Wrong assignment of lost connectors",
+                configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
+                        .connectors()
+                        .containsAll(lostAssignments.connectors()));
+        assertTrue("Wrong assignment of lost tasks",
+                configuredAssignment.getOrDefault(newWorker, new WorkerLoad.Builder(flakyWorker).build())
+                        .tasks()
+                        .containsAll(lostAssignments.tasks()));
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+    }
+
+    @Test
+    public void testLostAssignmentHandlingWhenWorkerBouncesBackButFinallyLeaves() {
+        // Customize assignor for this test case
+        time = new MockTime();
+        initAssignor();
+
+        assertTrue(assignor.candidateWorkersForReassignment.isEmpty());
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        Map<String, WorkerLoad> configuredAssignment = new HashMap<>();
+        configuredAssignment.put("worker0", workerLoad("worker0", 0, 2, 0, 4));
+        configuredAssignment.put("worker2", workerLoad("worker2", 4, 2, 8, 4));
+
+        ConnectorsAndTasks newSubmissions = new ConnectorsAndTasks.Builder().build();
+
+        // No lost assignments
+        assignor.handleLostAssignments(new ConnectorsAndTasks.Builder().build(),
+                newSubmissions,
+                new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+
+        String veryFlakyWorker = "worker1";
+        WorkerLoad lostLoad = workerLoad(veryFlakyWorker, 2, 2, 4, 4);
+
+        ConnectorsAndTasks lostAssignments = new ConnectorsAndTasks.Builder()
+                .withCopies(lostLoad.connectors(), lostLoad.tasks()).build();
+
+        // Lost assignments detected - No candidate worker has appeared yet (worker with no assignments)
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay / 2);
+        rebalanceDelay /= 2;
+
+        // A new worker (probably returning worker) has joined
+        configuredAssignment.put(veryFlakyWorker, new WorkerLoad.Builder(veryFlakyWorker).build());
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertThat("Wrong set of workers for reassignments",
+                Collections.singleton(veryFlakyWorker),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(time.milliseconds() + rebalanceDelay, assignor.scheduledRebalance);
+        assertEquals(rebalanceDelay, assignor.delay);
+
+        time.sleep(rebalanceDelay);
+
+        // The returning worker leaves permanently after joining briefly during the delay
+        configuredAssignment.remove(veryFlakyWorker);
+        assignor.handleLostAssignments(lostAssignments, newSubmissions, new ArrayList<>(configuredAssignment.values()));
+
+        assertTrue("Wrong assignment of lost connectors",
+                newSubmissions.connectors().containsAll(lostAssignments.connectors()));
+        assertTrue("Wrong assignment of lost tasks",
+                newSubmissions.tasks().containsAll(lostAssignments.tasks()));
+        assertThat("Wrong set of workers for reassignments",
+                Collections.emptySet(),
+                is(assignor.candidateWorkersForReassignment));
+        assertEquals(0, assignor.scheduledRebalance);
+        assertEquals(0, assignor.delay);
+    }
+
+    private WorkerLoad emptyWorkerLoad(String worker) {
+        return new WorkerLoad.Builder(worker).build();
+    }
+
+    private WorkerLoad workerLoad(String worker, int connectorStart, int connectorNum,
+                                  int taskStart, int taskNum) {
+        return new WorkerLoad.Builder(worker).with(
+                newConnectors(connectorStart, connectorStart + connectorNum),
+                newTasks(taskStart, taskStart + taskNum)).build();
+    }
+
+    private static List<String> newConnectors(int start, int end) {
+        return IntStream.range(start, end)
+                .mapToObj(i -> "connector" + i)
+                .collect(Collectors.toList());
+    }
+
+    private static List<ConnectorTaskId> newTasks(int start, int end) {
+        return IntStream.range(start, end)
+                .mapToObj(i -> new ConnectorTaskId("task", i))
+                .collect(Collectors.toList());
+    }
+
+    private static ClusterConfigState clusterConfigState(long offset,
+                                                         int connectorNum,
+                                                         int taskNum) {
+        return new ClusterConfigState(
+                offset,
+                connectorTaskCounts(1, connectorNum, taskNum),
+                connectorConfigs(1, connectorNum),
+                connectorTargetStates(1, connectorNum, TargetState.STARTED),
+                taskConfigs(0, connectorNum, connectorNum * taskNum),
+                Collections.emptySet());
+    }
+
+    private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
+                                                                  long givenOffset,
+                                                                  Map<String, ExtendedAssignment> givenAssignments) {
+        return givenAssignments.entrySet().stream()
+                .collect(Collectors.toMap(
+                    Map.Entry::getKey,
+                    e -> new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, e.getValue())));
+    }
+
+    private static Map<String, ExtendedWorkerState> memberConfigs(String givenLeader,
+                                                                  long givenOffset,
+                                                                  int start,
+                                                                  int connectorNum) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("worker" + i, new ExtendedWorkerState(expectedLeaderUrl(givenLeader), givenOffset, null)))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    private static Map<String, Integer> connectorTaskCounts(int start,
+                                                            int connectorNum,
+                                                            int taskCounts) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("connector" + i, taskCounts))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    private static Map<String, Map<String, String>> connectorConfigs(int start, int connectorNum) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("connector" + i, new HashMap<String, String>()))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    private static Map<String, TargetState> connectorTargetStates(int start,
+                                                                  int connectorNum,
+                                                                  TargetState state) {
+        return IntStream.range(start, connectorNum + 1)
+                .mapToObj(i -> new SimpleEntry<>("connector" + i, state))
+                .collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    private static Map<ConnectorTaskId, Map<String, String>> taskConfigs(int start,
+                                                                         int connectorNum,
+                                                                         int taskNum) {
+        return IntStream.range(start, taskNum + 1)
+                .mapToObj(i -> new SimpleEntry<>(
+                        new ConnectorTaskId("connector" + i / connectorNum + 1, i),
+                        new HashMap<String, String>())
+                ).collect(Collectors.toMap(SimpleEntry::getKey, SimpleEntry::getValue));
+    }
+
+    private void applyAssignments(Map<String, ExtendedAssignment> newAssignments) {
+        newAssignments.forEach((k, v) -> {
+            assignments.computeIfAbsent(k, noop -> newExpandableAssignment())
+                    .connectors()
+                    .removeAll(v.revokedConnectors());
+            assignments.computeIfAbsent(k, noop -> newExpandableAssignment())
+                    .connectors()
+                    .addAll(v.connectors());
+            assignments.computeIfAbsent(k, noop -> newExpandableAssignment())
+                    .tasks()
+                    .removeAll(v.revokedTasks());
+            assignments.computeIfAbsent(k, noop -> newExpandableAssignment())
+                    .tasks()
+                    .addAll(v.tasks());
+        });
+    }
+
+    private ExtendedAssignment newExpandableAssignment() {
+        return new ExtendedAssignment(
+                CONNECT_PROTOCOL_V1,
+                ConnectProtocol.Assignment.NO_ERROR,
+                leader,
+                leaderUrl,
+                offset,
+                new ArrayList<>(),
+                new ArrayList<>(),
+                new ArrayList<>(),
+                new ArrayList<>(),
+                0);
+    }
+
+    private static String expectedLeaderUrl(String givenLeader) {
+        return "http://" + givenLeader + ":8083";
+    }
+
+    private void assertAssignment(int connectorNum, int taskNum,
+                                  int revokedConnectorNum, int revokedTaskNum,
+                                  String... workers) {
+        assertAssignment(leader, connectorNum, taskNum, revokedConnectorNum, revokedTaskNum, workers);
+    }
+
+    private void assertAssignment(String expectedLeader, int connectorNum, int taskNum,
+                                  int revokedConnectorNum, int revokedTaskNum,
+                                  String... workers) {
+        assertThat("Wrong number of workers",
+                expectedMemberConfigs.keySet().size(),
+                is(workers.length));
+        assertThat("Wrong set of workers",
+                new ArrayList<>(expectedMemberConfigs.keySet()), hasItems(workers));
+        assertThat("Wrong number of assigned connectors",
+                expectedMemberConfigs.values().stream().map(v -> v.assignment().connectors().size()).reduce(0, Integer::sum),
+                is(connectorNum));
+        assertThat("Wrong number of assigned tasks",
+                expectedMemberConfigs.values().stream().map(v -> v.assignment().tasks().size()).reduce(0, Integer::sum),
+                is(taskNum));
+        assertThat("Wrong number of revoked connectors",
+                expectedMemberConfigs.values().stream().map(v -> v.assignment().revokedConnectors().size()).reduce(0, Integer::sum),
+                is(revokedConnectorNum));
+        assertThat("Wrong number of revoked tasks",
+                expectedMemberConfigs.values().stream().map(v -> v.assignment().revokedTasks().size()).reduce(0, Integer::sum),
+                is(revokedTaskNum));
+        assertThat("Wrong leader in assignments",
+                expectedMemberConfigs.values().stream().map(v -> v.assignment().leader()).distinct().collect(Collectors.joining(", ")),
+                is(expectedLeader));
+        assertThat("Wrong leaderUrl in assignments",
+                expectedMemberConfigs.values().stream().map(v -> v.assignment().leaderUrl()).distinct().collect(Collectors.joining(", ")),
+                is(expectedLeaderUrl(expectedLeader)));
+    }
+
+    private void assertDelay(int expectedDelay, Map<String, ExtendedAssignment> newAssignments) {
+        newAssignments.values().stream()
+                .forEach(a -> assertEquals(
+                        "Wrong rebalance delay in " + a, expectedDelay, a.delay()));
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
new file mode 100644
index 0000000..f06976a
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorIncrementalTest.java
@@ -0,0 +1,573 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime.distributed;
+
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MockClient;
+import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.internals.ClusterResourceListeners;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocol;
+import static org.apache.kafka.common.message.JoinGroupRequestData.JoinGroupRequestProtocolCollection;
+import static org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember;
+import static org.apache.kafka.connect.runtime.WorkerTestUtils.assertAssignment;
+import static org.apache.kafka.connect.runtime.WorkerTestUtils.clusterConfigState;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.WorkerState;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.COMPATIBLE;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
+import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.runners.Parameterized.Parameter;
+import static org.junit.runners.Parameterized.Parameters;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+import static org.mockito.Mockito.when;
+
+@RunWith(value = Parameterized.class)
+public class WorkerCoordinatorIncrementalTest {
+    @Rule
+    public MockitoRule rule = MockitoJUnit.rule();
+
+    private String connectorId1 = "connector1";
+    private String connectorId2 = "connector2";
+    private String connectorId3 = "connector3";
+    private ConnectorTaskId taskId1x0 = new ConnectorTaskId(connectorId1, 0);
+    private ConnectorTaskId taskId1x1 = new ConnectorTaskId(connectorId1, 1);
+    private ConnectorTaskId taskId2x0 = new ConnectorTaskId(connectorId2, 0);
+    private ConnectorTaskId taskId3x0 = new ConnectorTaskId(connectorId3, 0);
+
+    private String groupId = "test-group";
+    private int sessionTimeoutMs = 10;
+    private int rebalanceTimeoutMs = 60;
+    private int heartbeatIntervalMs = 2;
+    private long retryBackoffMs = 100;
+    private int requestTimeoutMs = 1000;
+    private MockTime time;
+    private MockClient client;
+    private Node node;
+    private Metadata metadata;
+    private Metrics metrics;
+    private ConsumerNetworkClient consumerClient;
+    private MockRebalanceListener rebalanceListener;
+    @Mock
+    private KafkaConfigBackingStore configStorage;
+    private WorkerCoordinator coordinator;
+    private int rebalanceDelay = DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT;
+
+    private String leaderId;
+    private String memberId;
+    private String anotherMemberId;
+    private String leaderUrl;
+    private String memberUrl;
+    private String anotherMemberUrl;
+    private int generationId;
+    private long offset;
+
+    private int configStorageCalls;
+
+    private ClusterConfigState configState1;
+    private ClusterConfigState configState2;
+    private ClusterConfigState configStateSingleTaskConnectors;
+
+    // Arguments are:
+    // - Protocol type
+    // - Expected metadata size
+    @Parameters
+    public static Iterable<?> mode() {
+        return Arrays.asList(new Object[][]{{COMPATIBLE, 2}});
+    }
+
+    @Parameter
+    public ConnectProtocolCompatibility compatibility;
+
+    @Parameter(1)
+    public int expectedMetadataSize;
+
+    @Before
+    public void setup() {
+        LogContext loggerFactory = new LogContext();
+
+        this.time = new MockTime();
+        this.metadata = new Metadata(0, Long.MAX_VALUE, loggerFactory, new ClusterResourceListeners());
+        this.client = new MockClient(time, metadata);
+        this.client.updateMetadata(TestUtils.metadataUpdateWith(1, Collections.singletonMap("topic", 1)));
+        this.node = metadata.fetch().nodes().get(0);
+        this.consumerClient = new ConsumerNetworkClient(loggerFactory, client, metadata, time,
+                retryBackoffMs, requestTimeoutMs, heartbeatIntervalMs);
+        this.metrics = new Metrics(time);
+        this.rebalanceListener = new MockRebalanceListener();
+
+        this.leaderId = "worker1";
+        this.memberId = "worker2";
+        this.anotherMemberId = "worker3";
+        this.leaderUrl = expectedUrl(leaderId);
+        this.memberUrl = expectedUrl(memberId);
+        this.anotherMemberUrl = expectedUrl(anotherMemberId);
+        this.generationId = 3;
+        this.offset = 10L;
+
+        this.configStorageCalls = 0;
+
+        this.coordinator = new WorkerCoordinator(
+                loggerFactory,
+                consumerClient,
+                groupId,
+                rebalanceTimeoutMs,
+                sessionTimeoutMs,
+                heartbeatIntervalMs,
+                metrics,
+                "worker" + groupId,
+                time,
+                retryBackoffMs,
+                expectedUrl(leaderId),
+                configStorage,
+                rebalanceListener,
+                compatibility,
+                rebalanceDelay);
+
+        configState1 = clusterConfigState(offset, 2, 4);
+    }
+
+    @After
+    public void teardown() {
+        this.metrics.close();
+        verifyNoMoreInteractions(configStorage);
+    }
+
+    private static String expectedUrl(String member) {
+        return "http://" + member + ":8083";
+    }
+
+    // We only test functionality unique to WorkerCoordinator. Other functionality is already
+    // well tested via the tests that cover AbstractCoordinator & ConsumerCoordinator.
+
+    @Test
+    public void testMetadata() {
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        JoinGroupRequestProtocolCollection serialized = coordinator.metadata();
+        assertEquals(expectedMetadataSize, serialized.size());
+
+        Iterator<JoinGroupRequestProtocol> protocolIterator = serialized.iterator();
+        assertTrue(protocolIterator.hasNext());
+        JoinGroupRequestProtocol defaultMetadata = protocolIterator.next();
+        assertEquals(compatibility.protocol(), defaultMetadata.name());
+        WorkerState state = IncrementalCooperativeConnectProtocol
+                .deserializeMetadata(ByteBuffer.wrap(defaultMetadata.metadata()));
+        assertEquals(offset, state.offset());
+
+        verify(configStorage, times(1)).snapshot();
+    }
+
+    @Test
+    public void testMetadataWithExistingAssignment() {
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        ExtendedAssignment assignment = new ExtendedAssignment(
+                CONNECT_PROTOCOL_V1, ExtendedAssignment.NO_ERROR, leaderId, leaderUrl, configState1.offset(),
+                Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
+                Collections.emptyList(), Collections.emptyList(), 0);
+        ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        // Using onJoinComplete to register the protocol selection decided by the the broker
+        // coordinator as well as an existing previous assignment that the call to metadata will
+        // include with v1 but not with v0
+        coordinator.onJoinComplete(generationId, memberId, compatibility.protocol(), buf);
+
+        JoinGroupRequestProtocolCollection serialized = coordinator.metadata();
+        assertEquals(expectedMetadataSize, serialized.size());
+
+        Iterator<JoinGroupRequestProtocol> protocolIterator = serialized.iterator();
+        assertTrue(protocolIterator.hasNext());
+        JoinGroupRequestProtocol selectedMetadata = protocolIterator.next();
+        assertEquals(compatibility.protocol(), selectedMetadata.name());
+        ExtendedWorkerState state = IncrementalCooperativeConnectProtocol
+                .deserializeMetadata(ByteBuffer.wrap(selectedMetadata.metadata()));
+        assertEquals(offset, state.offset());
+        assertNotEquals(ExtendedAssignment.empty(), state.assignment());
+        assertEquals(Collections.singletonList(connectorId1), state.assignment().connectors());
+        assertEquals(Arrays.asList(taskId1x0, taskId2x0), state.assignment().tasks());
+
+        verify(configStorage, times(1)).snapshot();
+    }
+
+    @Test
+    public void testMetadataWithExistingAssignmentButOlderProtocolSelection() {
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        ExtendedAssignment assignment = new ExtendedAssignment(
+                CONNECT_PROTOCOL_V1, ExtendedAssignment.NO_ERROR, leaderId, leaderUrl, configState1.offset(),
+                Collections.singletonList(connectorId1), Arrays.asList(taskId1x0, taskId2x0),
+                Collections.emptyList(), Collections.emptyList(), 0);
+        ByteBuffer buf = IncrementalCooperativeConnectProtocol.serializeAssignment(assignment);
+        // Using onJoinComplete to register the protocol selection decided by the the broker
+        // coordinator as well as an existing previous assignment that the call to metadata will
+        // include with v1 but not with v0
+        coordinator.onJoinComplete(generationId, memberId, EAGER.protocol(), buf);
+
+        JoinGroupRequestProtocolCollection serialized = coordinator.metadata();
+        assertEquals(expectedMetadataSize, serialized.size());
+
+        Iterator<JoinGroupRequestProtocol> protocolIterator = serialized.iterator();
+        assertTrue(protocolIterator.hasNext());
+        JoinGroupRequestProtocol selectedMetadata = protocolIterator.next();
+        assertEquals(compatibility.protocol(), selectedMetadata.name());
+        ExtendedWorkerState state = IncrementalCooperativeConnectProtocol
+                .deserializeMetadata(ByteBuffer.wrap(selectedMetadata.metadata()));
+        assertEquals(offset, state.offset());
+        assertNotEquals(ExtendedAssignment.empty(), state.assignment());
+
+        verify(configStorage, times(1)).snapshot();
+    }
+
+    @Test
+    public void testTaskAssignmentWhenWorkerJoins() {
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        coordinator.metadata();
+        ++configStorageCalls;
+
+        List<JoinGroupResponseMember> responseMembers = new ArrayList<>();
+        addJoinGroupResponseMember(responseMembers, leaderId, offset, null);
+        addJoinGroupResponseMember(responseMembers, memberId, offset, null);
+
+        Map<String, ByteBuffer> result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        ExtendedAssignment leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.singletonList(connectorId1), 4,
+                Collections.emptyList(), 0,
+                leaderAssignment);
+
+        ExtendedAssignment memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.singletonList(connectorId2), 4,
+                Collections.emptyList(), 0,
+                memberAssignment);
+
+        coordinator.metadata();
+        ++configStorageCalls;
+
+        responseMembers = new ArrayList<>();
+        addJoinGroupResponseMember(responseMembers, leaderId, offset, leaderAssignment);
+        addJoinGroupResponseMember(responseMembers, memberId, offset, memberAssignment);
+        addJoinGroupResponseMember(responseMembers, anotherMemberId, offset, null);
+
+        result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 2,
+                leaderAssignment);
+
+        memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                memberAssignment);
+
+        ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                anotherMemberAssignment);
+
+        verify(configStorage, times(configStorageCalls)).snapshot();
+    }
+
+    @Test
+    public void testTaskAssignmentWhenWorkerLeavesPermanently() {
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        // First assignment distributes configured connectors and tasks
+        coordinator.metadata();
+        ++configStorageCalls;
+
+        List<JoinGroupResponseMember> responseMembers = new ArrayList<>();
+        addJoinGroupResponseMember(responseMembers, leaderId, offset, null);
+        addJoinGroupResponseMember(responseMembers, memberId, offset, null);
+        addJoinGroupResponseMember(responseMembers, anotherMemberId, offset, null);
+
+        Map<String, ByteBuffer> result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        ExtendedAssignment leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.singletonList(connectorId1), 3,
+                Collections.emptyList(), 0,
+                leaderAssignment);
+
+        ExtendedAssignment memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.singletonList(connectorId2), 3,
+                Collections.emptyList(), 0,
+                memberAssignment);
+
+        ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 2,
+                Collections.emptyList(), 0,
+                anotherMemberAssignment);
+
+        // Second rebalance detects a worker is missing
+        coordinator.metadata();
+        ++configStorageCalls;
+
+        // Mark everyone as in sync with configState1
+        responseMembers = new ArrayList<>();
+        addJoinGroupResponseMember(responseMembers, leaderId, offset, leaderAssignment);
+        addJoinGroupResponseMember(responseMembers, memberId, offset, memberAssignment);
+
+        result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                leaderAssignment);
+
+        memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                memberAssignment);
+
+        rebalanceDelay /= 2;
+        time.sleep(rebalanceDelay);
+
+        // A third rebalance before the delay expires won't change the assignments
+        result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                leaderAssignment);
+
+        memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                memberAssignment);
+
+        time.sleep(rebalanceDelay + 1);
+
+        // A rebalance after the delay expires re-assigns the lost tasks
+        result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 1,
+                Collections.emptyList(), 0,
+                leaderAssignment);
+
+        memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 1,
+                Collections.emptyList(), 0,
+                memberAssignment);
+
+        verify(configStorage, times(configStorageCalls)).snapshot();
+    }
+
+    @Test
+    public void testTaskAssignmentWhenWorkerBounces() {
+        when(configStorage.snapshot()).thenReturn(configState1);
+
+        // First assignment distributes configured connectors and tasks
+        coordinator.metadata();
+        ++configStorageCalls;
+
+        List<JoinGroupResponseMember> responseMembers = new ArrayList<>();
+        addJoinGroupResponseMember(responseMembers, leaderId, offset, null);
+        addJoinGroupResponseMember(responseMembers, memberId, offset, null);
+        addJoinGroupResponseMember(responseMembers, anotherMemberId, offset, null);
+
+        Map<String, ByteBuffer> result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        ExtendedAssignment leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.singletonList(connectorId1), 3,
+                Collections.emptyList(), 0,
+                leaderAssignment);
+
+        ExtendedAssignment memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.singletonList(connectorId2), 3,
+                Collections.emptyList(), 0,
+                memberAssignment);
+
+        ExtendedAssignment anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 2,
+                Collections.emptyList(), 0,
+                anotherMemberAssignment);
+
+        // Second rebalance detects a worker is missing
+        coordinator.metadata();
+        ++configStorageCalls;
+
+        responseMembers = new ArrayList<>();
+        addJoinGroupResponseMember(responseMembers, leaderId, offset, leaderAssignment);
+        addJoinGroupResponseMember(responseMembers, memberId, offset, memberAssignment);
+
+        result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                leaderAssignment);
+
+        memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                memberAssignment);
+
+        rebalanceDelay /= 2;
+        time.sleep(rebalanceDelay);
+
+        // A third rebalance before the delay expires won't change the assignments even if the
+        // member returns in the meantime
+        addJoinGroupResponseMember(responseMembers, anotherMemberId, offset, null);
+        result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                leaderAssignment);
+
+        memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                memberAssignment);
+
+        anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                rebalanceDelay,
+                anotherMemberAssignment);
+
+        time.sleep(rebalanceDelay + 1);
+
+        result = coordinator.performAssignment(leaderId, compatibility.protocol(), responseMembers);
+
+        // A rebalance after the delay expires re-assigns the lost tasks the the returning member
+        leaderAssignment = deserializeAssignment(result, leaderId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                leaderAssignment);
+
+        memberAssignment = deserializeAssignment(result, memberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 0,
+                Collections.emptyList(), 0,
+                memberAssignment);
+
+        anotherMemberAssignment = deserializeAssignment(result, anotherMemberId);
+        assertAssignment(leaderId, offset,
+                Collections.emptyList(), 2,
+                Collections.emptyList(), 0,
+                anotherMemberAssignment);
+
+        verify(configStorage, times(configStorageCalls)).snapshot();
+    }
+
+    private static class MockRebalanceListener implements WorkerRebalanceListener {
+        public ExtendedAssignment assignment = null;
+
+        public String revokedLeader;
+        public Collection<String> revokedConnectors = Collections.emptyList();
+        public Collection<ConnectorTaskId> revokedTasks = Collections.emptyList();
+
+        public int revokedCount = 0;
+        public int assignedCount = 0;
+
+        @Override
+        public void onAssigned(ExtendedAssignment assignment, int generation) {
+            this.assignment = assignment;
+            assignedCount++;
+        }
+
+        @Override
+        public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+            if (connectors.isEmpty() && tasks.isEmpty()) {
+                return;
+            }
+            this.revokedLeader = leader;
+            this.revokedConnectors = connectors;
+            this.revokedTasks = tasks;
+            revokedCount++;
+        }
+    }
+
+    private static ExtendedAssignment deserializeAssignment(Map<String, ByteBuffer> assignment,
+                                                           String member) {
+        return IncrementalCooperativeConnectProtocol.deserializeAssignment(assignment.get(member));
+    }
+
+    private static void addJoinGroupResponseMember(List<JoinGroupResponseMember> responseMembers,
+                                                   String member,
+                                                   long offset,
+                                                   ExtendedAssignment assignment) {
+        responseMembers.add(new JoinGroupResponseMember()
+                .setMemberId(member)
+                .setMetadata(IncrementalCooperativeConnectProtocol.serializeMetadata(
+                        new ExtendedWorkerState(expectedUrl(member), offset, assignment)).array())
+        );
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
index 9f980a6..182d6bd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinatorTest.java
@@ -43,8 +43,9 @@ import org.easymock.Mock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 import org.powermock.api.easymock.PowerMock;
-import org.powermock.reflect.Whitebox;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -59,7 +60,10 @@ import java.util.Map;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.runners.Parameterized.Parameter;
+import static org.junit.runners.Parameterized.Parameters;
 
+@RunWith(value = Parameterized.class)
 public class WorkerCoordinatorTest {
 
     private static final String LEADER_URL = "leaderUrl:8083";
@@ -92,6 +96,22 @@ public class WorkerCoordinatorTest {
     private ClusterConfigState configState2;
     private ClusterConfigState configStateSingleTaskConnectors;
 
+    // Arguments are:
+    // - Protocol type
+    // - Expected metadata size
+    @Parameters
+    public static Iterable<?> mode() {
+        return Arrays.asList(new Object[][]{
+                {ConnectProtocolCompatibility.EAGER, 1},
+                {ConnectProtocolCompatibility.COMPATIBLE, 2}});
+    }
+
+    @Parameter
+    public ConnectProtocolCompatibility compatibility;
+
+    @Parameter(1)
+    public int expectedMetadataSize;
+
     @Before
     public void setup() {
         LogContext logContext = new LogContext();
@@ -119,7 +139,9 @@ public class WorkerCoordinatorTest {
                 retryBackoffMs,
                 LEADER_URL,
                 configStorage,
-                rebalanceListener);
+                rebalanceListener,
+                compatibility,
+                0);
 
         configState1 = new ClusterConfigState(
                 1L,
@@ -193,12 +215,12 @@ public class WorkerCoordinatorTest {
         PowerMock.replayAll();
 
         JoinGroupRequestData.JoinGroupRequestProtocolCollection serialized = coordinator.metadata();
-        assertEquals(1, serialized.size());
+        assertEquals(expectedMetadataSize, serialized.size());
 
         Iterator<JoinGroupRequestData.JoinGroupRequestProtocol> protocolIterator = serialized.iterator();
         assertTrue(protocolIterator.hasNext());
         JoinGroupRequestData.JoinGroupRequestProtocol defaultMetadata = protocolIterator.next();
-        assertEquals(WorkerCoordinator.DEFAULT_SUBPROTOCOL, defaultMetadata.name());
+        assertEquals(compatibility.protocol(), defaultMetadata.name());
         ConnectProtocol.WorkerState state = ConnectProtocol.deserializeMetadata(
                 ByteBuffer.wrap(defaultMetadata.metadata()));
         assertEquals(1, state.offset());
@@ -383,7 +405,7 @@ public class WorkerCoordinatorTest {
                 .setMemberId("member")
                 .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
         );
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
+        Map<String, ByteBuffer> result = coordinator.performAssignment("leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
 
         // configState1 has 1 connector, 1 task
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
@@ -426,7 +448,7 @@ public class WorkerCoordinatorTest {
                 .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
         );
 
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
+        Map<String, ByteBuffer> result = coordinator.performAssignment("leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
 
         // configState2 has 2 connector, 3 tasks and should trigger round robin assignment
         ConnectProtocol.Assignment leaderAssignment = ConnectProtocol.deserializeAssignment(result.get("leader"));
@@ -469,7 +491,7 @@ public class WorkerCoordinatorTest {
                 .setMetadata(ConnectProtocol.serializeMetadata(new ConnectProtocol.WorkerState(MEMBER_URL, 1L)).array())
         );
 
-        Map<String, ByteBuffer> result = Whitebox.invokeMethod(coordinator, "performAssignment", "leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
+        Map<String, ByteBuffer> result = coordinator.performAssignment("leader", WorkerCoordinator.DEFAULT_SUBPROTOCOL, responseMembers);
 
         // Round robin assignment when there are the same number of connectors and tasks should result in each being
         // evenly distributed across the workers, i.e. round robin assignment of connectors first, then followed by tasks
@@ -536,7 +558,7 @@ public class WorkerCoordinatorTest {
     }
 
     private static class MockRebalanceListener implements WorkerRebalanceListener {
-        public ConnectProtocol.Assignment assignment = null;
+        public ExtendedAssignment assignment = null;
 
         public String revokedLeader;
         public Collection<String> revokedConnectors;
@@ -546,13 +568,16 @@ public class WorkerCoordinatorTest {
         public int assignedCount = 0;
 
         @Override
-        public void onAssigned(ConnectProtocol.Assignment assignment, int generation) {
+        public void onAssigned(ExtendedAssignment assignment, int generation) {
             this.assignment = assignment;
             assignedCount++;
         }
 
         @Override
         public void onRevoked(String leader, Collection<String> connectors, Collection<ConnectorTaskId> tasks) {
+            if (connectors.isEmpty() && tasks.isEmpty()) {
+                return;
+            }
             this.revokedLeader = leader;
             this.revokedConnectors = connectors;
             this.revokedTasks = tasks;