You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/09 21:20:27 UTC

[kafka] branch 2.4 updated: KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new c4626b8  KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827)
c4626b8 is described below

commit c4626b8d72b1cea934063aac699cf845fa3e9cbc
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Tue Jun 9 13:02:35 2020 -0700

    KAFKA-9849: Fix issue with worker.unsync.backoff.ms creating zombie workers when incremental cooperative rebalancing is used (#8827)
    
    When Incremental Cooperative Rebalancing is enabled and a worker fails to read to the end of the config topic, it needs to voluntarily revoke its locally running tasks on time, before these tasks get assigned to another worker, creating a situation where redundant tasks are running in the Connect cluster.
    
    Additionally, instead of using the delay `worker.unsync.backoff.ms` that was defined for the eager rebalancing protocol and has a long default value (which coincidentally is equal to the default value of the rebalance delay of the incremental cooperative protocol), the worker should quickly attempt to re-read the config topic and backoff for a fraction of the rebalance delay. After this fix, the worker will retry for a maximum time of 5 times before it revokes its running assignment a [...]
    
    Unit tests are added to cover the backoff logic with incremental cooperative rebalancing.
    
    Reviewers: Randall Hauch <rh...@gmail.com>
---
 .../runtime/distributed/DistributedHerder.java     |  29 ++-
 .../runtime/distributed/ExtendedAssignment.java    |  15 ++
 .../runtime/distributed/WorkerCoordinator.java     |   4 +
 .../runtime/distributed/WorkerGroupMember.java     |   4 +
 .../integration/ConnectIntegrationTestUtils.java   |   1 +
 .../runtime/distributed/DistributedHerderTest.java | 207 +++++++++++++++++++--
 6 files changed, 247 insertions(+), 13 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 703386e..91139c2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -27,7 +27,6 @@ import org.apache.kafka.common.metrics.stats.Max;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
-import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.policy.ConnectorClientConfigOverridePolicy;
@@ -92,6 +91,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
+import static org.apache.kafka.connect.runtime.distributed.ConnectProtocolCompatibility.EAGER;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
 
 /**
@@ -131,6 +131,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private static final long START_AND_STOP_SHUTDOWN_TIMEOUT_MS = TimeUnit.SECONDS.toMillis(1);
     private static final long RECONFIGURE_CONNECTOR_TASKS_BACKOFF_MS = 250;
     private static final int START_STOP_THREAD_POOL_SIZE = 8;
+    private static final short BACKOFF_RETRIES = 5;
 
     private final AtomicLong requestSeqNum = new AtomicLong();
 
@@ -177,6 +178,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     private SecretKey sessionKey;
     private volatile long keyExpiration;
     private short currentProtocolVersion;
+    private short backoffRetries;
 
     private final DistributedConfig config;
 
@@ -247,6 +249,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         scheduledRebalance = Long.MAX_VALUE;
         keyExpiration = Long.MAX_VALUE;
         sessionKey = null;
+        backoffRetries = BACKOFF_RETRIES;
 
         currentProtocolVersion = ConnectProtocolCompatibility.compatibility(
             config.getString(DistributedConfig.CONNECT_PROTOCOL_CONFIG)
@@ -1077,6 +1080,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
             configBackingStore.refresh(timeoutMs, TimeUnit.MILLISECONDS);
             configState = configBackingStore.snapshot();
             log.info("Finished reading to end of log and updated config snapshot, new config log offset: {}", configState.offset());
+            backoffRetries = BACKOFF_RETRIES;
             return true;
         } catch (TimeoutException e) {
             // in case reading the log takes too long, leave the group to ensure a quick rebalance (although by default we should be out of the group already)
@@ -1089,7 +1093,28 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
     }
 
     private void backoff(long ms) {
-        Utils.sleep(ms);
+        if (ConnectProtocolCompatibility.fromProtocolVersion(currentProtocolVersion) == EAGER) {
+            time.sleep(ms);
+            return;
+        }
+
+        if (backoffRetries > 0) {
+            int rebalanceDelayFraction =
+                    config.getInt(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG) / 10 / backoffRetries;
+            time.sleep(rebalanceDelayFraction);
+            --backoffRetries;
+            return;
+        }
+
+        ExtendedAssignment runningAssignmentSnapshot;
+        synchronized (this) {
+            runningAssignmentSnapshot = ExtendedAssignment.duplicate(runningAssignment);
+        }
+        log.info("Revoking current running assignment {} because after {} retries the worker "
+                + "has not caught up with the latest Connect cluster updates",
+                runningAssignmentSnapshot, BACKOFF_RETRIES);
+        member.revokeAssignment(runningAssignmentSnapshot);
+        backoffRetries = BACKOFF_RETRIES;
     }
 
     private void startAndStop(Collection<Callable<Void>> callables) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
index 7518e06..e544407 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ExtendedAssignment.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -85,6 +86,20 @@ public class ExtendedAssignment extends ConnectProtocol.Assignment {
         this.delay = delay;
     }
 
+    public static ExtendedAssignment duplicate(ExtendedAssignment assignment) {
+        return new ExtendedAssignment(
+                assignment.version(),
+                assignment.error(),
+                assignment.leader(),
+                assignment.leaderUrl(),
+                assignment.offset(),
+                new LinkedHashSet<>(assignment.connectors()),
+                new LinkedHashSet<>(assignment.tasks()),
+                new LinkedHashSet<>(assignment.revokedConnectors()),
+                new LinkedHashSet<>(assignment.revokedTasks()),
+                assignment.delay());
+    }
+
     /**
      * Return the version of the connect protocol that this assignment belongs to.
      *
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
index 17d6690..800c1d3 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerCoordinator.java
@@ -269,6 +269,10 @@ public class WorkerCoordinator extends AbstractCoordinator implements Closeable
         return lastCompletedGenerationId;
     }
 
+    public void revokeAssignment(ExtendedAssignment assignment) {
+        listener.onRevoked(assignment.leader(), assignment.connectors(), assignment.tasks());
+    }
+
     private boolean isLeader() {
         final ExtendedAssignment localAssignmentSnapshot = assignmentSnapshot;
         return localAssignmentSnapshot != null && memberId().equals(localAssignmentSnapshot.leader());
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 13a52d4..739575e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -208,6 +208,10 @@ public class WorkerGroupMember {
         return coordinator.currentProtocolVersion();
     }
 
+    public void revokeAssignment(ExtendedAssignment assignment) {
+        coordinator.revokeAssignment(assignment);
+    }
+
     private void stop(boolean swallowException) {
         log.trace("Stopping the Connect group member.");
         AtomicReference<Throwable> firstException = new AtomicReference<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
index 1aeaa07..058dbe2 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectIntegrationTestUtils.java
@@ -22,6 +22,7 @@ import org.junit.runner.Description;
 import org.slf4j.Logger;
 
 /**
+ * A utility class for Connect's integration tests
  */
 public class ConnectIntegrationTestUtils {
     public static TestRule newTestWatcher(Logger log) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
index cfc08c4..81208d3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/distributed/DistributedHerderTest.java
@@ -84,6 +84,8 @@ import static javax.ws.rs.core.Response.Status.FORBIDDEN;
 import static org.apache.kafka.connect.runtime.distributed.ConnectProtocol.CONNECT_PROTOCOL_V0;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V1;
 import static org.apache.kafka.connect.runtime.distributed.IncrementalCooperativeConnectProtocol.CONNECT_PROTOCOL_V2;
+import static org.easymock.EasyMock.capture;
+import static org.easymock.EasyMock.newCapture;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
@@ -198,7 +200,7 @@ public class DistributedHerderTest {
         connectProtocolVersion = CONNECT_PROTOCOL_V0;
 
         herder = PowerMock.createPartialMock(DistributedHerder.class,
-                new String[]{"backoff", "connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"},
+                new String[]{"connectorTypeForClass", "updateDeletedConnectorStatus", "updateDeletedTaskStatus"},
                 new DistributedConfig(HERDER_CONFIG), worker, WORKER_ID, KAFKA_CLUSTER_ID,
                 statusBackingStore, configBackingStore, member, MEMBER_URL, metrics, time, noneConnectorClientConfigOverridePolicy);
 
@@ -526,7 +528,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -573,7 +575,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -587,7 +589,7 @@ public class DistributedHerderTest {
 
         // CONN2 creation should fail
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
 
@@ -622,7 +624,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -639,7 +641,7 @@ public class DistributedHerderTest {
 
         // CONN2 creation should fail
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.<Herder.Created<ConnectorInfo>>isNull());
         PowerMock.expectLastCall();
 
@@ -678,7 +680,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SinkConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).times(3);
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);
@@ -690,7 +692,7 @@ public class DistributedHerderTest {
         // CONN2 creation should fail because the worker group id (connect-test-group) conflicts with
         // the consumer group id we would use for this sink
 
-        Capture<Throwable> error = EasyMock.newCapture();
+        Capture<Throwable> error = newCapture();
         putConnectorCallback.onCompletion(EasyMock.capture(error), EasyMock.isNull(Herder.Created.class));
         PowerMock.expectLastCall();
 
@@ -717,7 +719,7 @@ public class DistributedHerderTest {
         EasyMock.expect(member.memberId()).andStubReturn("leader");
         EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V0);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins);
         EasyMock.expect(plugins.newConnector(EasyMock.anyString())).andReturn(null);
@@ -1450,7 +1452,6 @@ public class DistributedHerderTest {
         EasyMock.expectLastCall().andThrow(new TimeoutException());
         member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log"));
         EasyMock.expectLastCall();
-        PowerMock.expectPrivate(herder, "backoff", DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT);
         member.requestRejoin();
 
         // After backoff, restart the process and this time succeed
@@ -1472,14 +1473,198 @@ public class DistributedHerderTest {
 
         PowerMock.replayAll();
 
+        long before = time.milliseconds();
+        int workerUnsyncBackoffMs = DistributedConfig.WORKER_UNSYNC_BACKOFF_MS_DEFAULT;
+        int coordinatorDiscoveryTimeoutMs = 100;
         herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds());
+
         time.sleep(1000L);
         assertStatistics("leaderUrl", true, 3, 0, Double.NEGATIVE_INFINITY, Double.POSITIVE_INFINITY);
 
+        before = time.milliseconds();
         herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds());
         time.sleep(2000L);
         assertStatistics("leaderUrl", false, 3, 1, 100, 2000L);
 
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpRetriesForIncrementalCooperative() throws Exception {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // The leader got its assignment
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Another rebalance is triggered but this time it fails to read to the max offset and
+        // triggers a re-sync
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
+
+        // The leader will retry a few times to read to the end of the config log
+        int retries = 2;
+        member.requestRejoin();
+        for (int i = retries; i >= 0; --i) {
+            // Reading to end of log times out
+            configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+            EasyMock.expectLastCall().andThrow(new TimeoutException());
+            member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log"));
+            EasyMock.expectLastCall();
+        }
+
+        // After a few retries succeed to read the log to the end
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        long before;
+        int coordinatorDiscoveryTimeoutMs = 100;
+        int maxRetries = 5;
+        for (int i = maxRetries; i >= maxRetries - retries; --i) {
+            before = time.milliseconds();
+            int workerUnsyncBackoffMs =
+                    DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i;
+            herder.tick();
+            assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds());
+            coordinatorDiscoveryTimeoutMs = 0;
+        }
+
+        before = time.milliseconds();
+        coordinatorDiscoveryTimeoutMs = 100;
+        herder.tick();
+        assertEquals(before + coordinatorDiscoveryTimeoutMs, time.milliseconds());
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testJoinLeaderCatchUpFailsForIncrementalCooperative() throws Exception {
+        connectProtocolVersion = CONNECT_PROTOCOL_V1;
+
+        // Join group and as leader fail to do assignment
+        EasyMock.expect(member.memberId()).andStubReturn("leader");
+        EasyMock.expect(member.currentProtocolVersion()).andStubReturn(CONNECT_PROTOCOL_V1);
+        expectRebalance(1, Arrays.asList(CONN1), Arrays.asList(TASK1));
+        expectPostRebalanceCatchup(SNAPSHOT);
+
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // The leader got its assignment
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+
+        EasyMock.expect(worker.getPlugins()).andReturn(plugins);
+        // and the new assignment started
+        worker.startConnector(EasyMock.eq(CONN1), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        EasyMock.expect(worker.isRunning(CONN1)).andReturn(true);
+        EasyMock.expect(worker.connectorTaskConfigs(CONN1, conn1SinkConfig)).andReturn(TASK_CONFIGS);
+
+        worker.startTask(EasyMock.eq(TASK1), EasyMock.anyObject(), EasyMock.anyObject(), EasyMock.anyObject(),
+                EasyMock.eq(herder), EasyMock.eq(TargetState.STARTED));
+        PowerMock.expectLastCall().andReturn(true);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        // Another rebalance is triggered but this time it fails to read to the max offset and
+        // triggers a re-sync
+        expectRebalance(Collections.<String>emptyList(), Collections.<ConnectorTaskId>emptyList(),
+                ConnectProtocol.Assignment.CONFIG_MISMATCH, 1, Collections.<String>emptyList(),
+                Collections.<ConnectorTaskId>emptyList());
+
+        // The leader will exhaust the retries while trying to read to the end of the config log
+        int maxRetries = 5;
+        member.requestRejoin();
+        for (int i = maxRetries; i >= 0; --i) {
+            // Reading to end of log times out
+            configBackingStore.refresh(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
+            EasyMock.expectLastCall().andThrow(new TimeoutException());
+            member.maybeLeaveGroup(EasyMock.eq("taking too long to read the log"));
+            EasyMock.expectLastCall();
+        }
+
+        Capture<ExtendedAssignment> assignmentCapture = newCapture();
+        member.revokeAssignment(capture(assignmentCapture));
+        PowerMock.expectLastCall();
+
+        // After a complete backoff and a revocation of running tasks rejoin and this time succeed
+        // The worker gets back the assignment that had given up
+        expectRebalance(Collections.emptyList(), Collections.emptyList(),
+                ConnectProtocol.Assignment.NO_ERROR,
+                1, Arrays.asList(CONN1), Arrays.asList(TASK1), 0);
+        expectPostRebalanceCatchup(SNAPSHOT);
+        member.poll(EasyMock.anyInt());
+        PowerMock.expectLastCall();
+
+        PowerMock.replayAll();
+
+        assertStatistics(0, 0, 0, Double.POSITIVE_INFINITY);
+        herder.tick();
+
+        time.sleep(2000L);
+        assertStatistics(3, 1, 100, 2000);
+        herder.tick();
+
+        long before;
+        int coordinatorDiscoveryTimeoutMs = 100;
+        for (int i = maxRetries; i > 0; --i) {
+            before = time.milliseconds();
+            int workerUnsyncBackoffMs =
+                    DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_DEFAULT / 10 / i;
+            herder.tick();
+            assertEquals(before + coordinatorDiscoveryTimeoutMs + workerUnsyncBackoffMs, time.milliseconds());
+            coordinatorDiscoveryTimeoutMs = 0;
+        }
+
+        before = time.milliseconds();
+        herder.tick();
+        assertEquals(before, time.milliseconds());
+        assertEquals(Collections.singleton(CONN1), assignmentCapture.getValue().connectors());
+        assertEquals(Collections.singleton(TASK1), assignmentCapture.getValue().tasks());
+        herder.tick();
 
         PowerMock.verifyAll();
     }
@@ -1566,7 +1751,7 @@ public class DistributedHerderTest {
         // config validation
         Connector connectorMock = PowerMock.createMock(SourceConnector.class);
         EasyMock.expect(worker.configTransformer()).andReturn(transformer).times(2);
-        final Capture<Map<String, String>> configCapture = EasyMock.newCapture();
+        final Capture<Map<String, String>> configCapture = newCapture();
         EasyMock.expect(transformer.transform(EasyMock.capture(configCapture))).andAnswer(configCapture::getValue);
         EasyMock.expect(worker.getPlugins()).andReturn(plugins).anyTimes();
         EasyMock.expect(plugins.compareAndSwapLoaders(connectorMock)).andReturn(delegatingLoader);