You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/08/13 19:06:45 UTC

[kafka] branch 2.2 updated: KAFKA-8391; Improved the Connect integration tests to make them less flaky

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.2 by this push:
     new 6694636  KAFKA-8391; Improved the Connect integration tests to make them less flaky
6694636 is described below

commit 66946362a6203729937627d697722c120770d774
Author: Randall Hauch <rh...@gmail.com>
AuthorDate: Tue Aug 13 11:14:41 2019 -0700

    KAFKA-8391; Improved the Connect integration tests to make them less flaky
    
    Added the ability for the connector handles and task handles, which are used by the monitorable source and sink connectors used to verify the functionality of the Connect framework, to record the number of times the connector and tasks have each been started, and to allow a test to obtain a `RestartLatch` that can be used to block until the connectors and/or tasks have been restarted a specified number of types.
    
    Typically, a test will get the `ConnectorHandle` for a connector, and call the `ConnectorHandle.expectedRestarts(int)` method with the expected number of times that the connector and/or tasks will be restarted, and will hold onto the resulting `RestartLatch`. The test will then change the connector (or otherwise cause the connector to restart) one or more times as desired, and then call `RestartLatch.await(long, TimeUnit)` to block the test up to a specified duration for the connector [...]
    
    This commit also increases several of the maximum wait times used in other integration tests. It doesn’t hurt to potentially wait longer, since most test runs will not need to wait the maximum amount of time anyway. However, in the rare cases that do need that extra time, waiting a bit more is fine if we can reduce the flakiness and minimize test failures that happened to time out too early.
    
    Unit tests were added for the new `RestartLatch` and `StopAndStartCounter` utility classes. This PR only affects the tests and does not affect any runtime code or API.
    
    **This should be merged on `trunk` and backported to the `2.3.x` branch.**
    
    Author: Randall Hauch <rh...@gmail.com>
    
    Reviewers: Konstantine Karantasis, Arjun Satish
    
    Closes #7019 from rhauch/kafka-8391
---
 .../integration/ConnectWorkerIntegrationTest.java  |   7 +-
 .../kafka/connect/integration/ConnectorHandle.java | 135 +++++++++++++++-
 .../integration/ErrorHandlingIntegrationTest.java  |   5 +-
 .../integration/ExampleConnectIntegrationTest.java |   5 +-
 .../integration/MonitorableSinkConnector.java      |   6 +
 .../integration/MonitorableSourceConnector.java    |   4 +
 .../connect/integration/StartAndStopCounter.java   | 179 +++++++++++++++++++++
 .../integration/StartAndStopCounterTest.java       | 116 +++++++++++++
 .../connect/integration/StartAndStopLatch.java     | 118 ++++++++++++++
 .../connect/integration/StartAndStopLatchTest.java | 137 ++++++++++++++++
 .../kafka/connect/integration/TaskHandle.java      |  85 ++++++++--
 11 files changed, 776 insertions(+), 21 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 09363cd..ca63a07 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -35,6 +35,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -53,9 +54,9 @@ public class ConnectWorkerIntegrationTest {
     private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
 
     private static final int NUM_TOPIC_PARTITIONS = 3;
-    private static final int CONNECTOR_SETUP_DURATION_MS = 15_000;
-    private static final int WORKER_SETUP_DURATION_MS = 20_000;
-    private static final int OFFSET_COMMIT_INTERVAL_MS = 30_000;
+    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    private static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
     private static final String CONNECTOR_NAME = "simple-source";
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
index 0df0f8c..a4a4612 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
@@ -21,10 +21,12 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
 /**
@@ -36,6 +38,7 @@ public class ConnectorHandle {
 
     private final String connectorName;
     private final Map<String, TaskHandle> taskHandles = new ConcurrentHashMap<>();
+    private final StartAndStopCounter startAndStopCounter = new StartAndStopCounter();
 
     private CountDownLatch recordsRemainingLatch;
     private CountDownLatch recordsToCommitLatch;
@@ -152,7 +155,7 @@ public class ConnectorHandle {
      * @param timeout max duration to wait for records
      * @throws InterruptedException if another threads interrupts this one while waiting for records
      */
-    public void awaitRecords(int timeout) throws InterruptedException {
+    public void awaitRecords(long timeout) throws InterruptedException {
         if (recordsRemainingLatch == null || expectedRecords < 0) {
             throw new IllegalStateException("expectedRecords() was not set for this connector?");
         }
@@ -174,7 +177,7 @@ public class ConnectorHandle {
      * @param  timeout duration to wait for commits
      * @throws InterruptedException if another threads interrupts this one while waiting for commits
      */
-    public void awaitCommits(int timeout) throws InterruptedException {
+    public void awaitCommits(long timeout) throws InterruptedException {
         if (recordsToCommitLatch == null || expectedCommits < 0) {
             throw new IllegalStateException("expectedCommits() was not set for this connector?");
         }
@@ -189,6 +192,134 @@ public class ConnectorHandle {
         }
     }
 
+    /**
+     * Record that this connector has been started. This should be called by the connector under
+     * test.
+     *
+     * @see #expectedStarts(int)
+     */
+    public void recordConnectorStart() {
+        startAndStopCounter.recordStart();
+    }
+
+    /**
+     * Record that this connector has been stopped. This should be called by the connector under
+     * test.
+     *
+     * @see #expectedStarts(int)
+     */
+    public void recordConnectorStop() {
+        startAndStopCounter.recordStop();
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the connector using this handle
+     * and all tasks using {@link TaskHandle} have completed the expected number of
+     * starts, starting the counts at the time this method is called.
+     *
+     * <p>A test can call this method, specifying the number of times the connector and tasks
+     * will each be stopped and started from that point (typically {@code expectedStarts(1)}).
+     * The test should then change the connector or otherwise cause the connector to restart one or
+     * more times, and then can call {@link StartAndStopLatch#await(long, TimeUnit)} to wait up to a
+     * specified duration for the connector and all tasks to be started at least the specified
+     * number of times.
+     *
+     * <p>This method does not track the number of times the connector and tasks are stopped, and
+     * only tracks the number of times the connector and tasks are <em>started</em>.
+     *
+     * @param expectedStarts the minimum number of starts that are expected once this method is
+     *                       called
+     * @return the latch that can be used to wait for the starts to complete; never null
+     */
+    public StartAndStopLatch expectedStarts(int expectedStarts) {
+        return expectedStarts(expectedStarts, true);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the connector using this handle
+     * and optionally all tasks using {@link TaskHandle} have completed the expected number of
+     * starts, starting the counts at the time this method is called.
+     *
+     * <p>A test can call this method, specifying the number of times the connector and tasks
+     * will each be stopped and started from that point (typically {@code expectedStarts(1)}).
+     * The test should then change the connector or otherwise cause the connector to restart one or
+     * more times, and then can call {@link StartAndStopLatch#await(long, TimeUnit)} to wait up to a
+     * specified duration for the connector and all tasks to be started at least the specified
+     * number of times.
+     *
+     * <p>This method does not track the number of times the connector and tasks are stopped, and
+     * only tracks the number of times the connector and tasks are <em>started</em>.
+     *
+     * @param expectedStarts the minimum number of starts that are expected once this method is
+     *                       called
+     * @param includeTasks  true if the latch should also wait for the tasks to be stopped the
+     *                      specified minimum number of times
+     * @return the latch that can be used to wait for the starts to complete; never null
+     */
+    public StartAndStopLatch expectedStarts(int expectedStarts, boolean includeTasks) {
+        List<StartAndStopLatch> taskLatches = null;
+        if (includeTasks) {
+            taskLatches = taskHandles.values().stream()
+                                     .map(task -> task.expectedStarts(expectedStarts))
+                                     .collect(Collectors.toList());
+        }
+        return startAndStopCounter.expectedStarts(expectedStarts, taskLatches);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the connector using this handle
+     * and optionally all tasks using {@link TaskHandle} have completed the minimum number of
+     * stops, starting the counts at the time this method is called.
+     *
+     * <p>A test can call this method, specifying the number of times the connector and tasks
+     * will each be stopped from that point (typically {@code expectedStops(1)}).
+     * The test should then change the connector or otherwise cause the connector to stop (or
+     * restart) one or more times, and then can call
+     * {@link StartAndStopLatch#await(long, TimeUnit)} to wait up to a specified duration for the
+     * connector and all tasks to be started at least the specified number of times.
+     *
+     * <p>This method does not track the number of times the connector and tasks are stopped, and
+     * only tracks the number of times the connector and tasks are <em>started</em>.
+     *
+     * @param expectedStops the minimum number of starts that are expected once this method is
+     *                      called
+     * @return the latch that can be used to wait for the starts to complete; never null
+     */
+    public StartAndStopLatch expectedStops(int expectedStops) {
+        return expectedStops(expectedStops, true);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the connector using this handle
+     * and optionally all tasks using {@link TaskHandle} have completed the minimum number of
+     * stops, starting the counts at the time this method is called.
+     *
+     * <p>A test can call this method, specifying the number of times the connector and tasks
+     * will each be stopped from that point (typically {@code expectedStops(1)}).
+     * The test should then change the connector or otherwise cause the connector to stop (or
+     * restart) one or more times, and then can call
+     * {@link StartAndStopLatch#await(long, TimeUnit)} to wait up to a specified duration for the
+     * connector and all tasks to be started at least the specified number of times.
+     *
+     * <p>This method does not track the number of times the connector and tasks are stopped, and
+     * only tracks the number of times the connector and tasks are <em>started</em>.
+     *
+     * @param expectedStops the minimum number of starts that are expected once this method is
+     *                      called
+     * @param includeTasks  true if the latch should also wait for the tasks to be stopped the
+     *                      specified minimum number of times
+     * @return the latch that can be used to wait for the starts to complete; never null
+     */
+    public StartAndStopLatch expectedStops(int expectedStops, boolean includeTasks) {
+        List<StartAndStopLatch> taskLatches = null;
+        if (includeTasks) {
+            taskLatches = taskHandles.values().stream()
+                                     .map(task -> task.expectedStops(expectedStops))
+                                     .collect(Collectors.toList());
+        }
+        return startAndStopCounter.expectedStops(expectedStops, taskLatches);
+    }
+
     @Override
     public String toString() {
         return "ConnectorHandle{" +
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 67bcc74..33e6cf5 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
@@ -75,8 +76,8 @@ public class ErrorHandlingIntegrationTest {
     private static final int EXPECTED_CORRECT_RECORDS = 19;
     private static final int EXPECTED_INCORRECT_RECORDS = 1;
     private static final int NUM_TASKS = 1;
-    private static final int CONNECTOR_SETUP_DURATION_MS = 5000;
-    private static final int CONSUME_MAX_DURATION_MS = 5000;
+    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final long CONSUME_MAX_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
 
     private EmbeddedConnectCluster connect;
     private ConnectorHandle connectorHandle;
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 224d6ac..bd06291 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
@@ -55,8 +56,8 @@ public class ExampleConnectIntegrationTest {
 
     private static final int NUM_RECORDS_PRODUCED = 2000;
     private static final int NUM_TOPIC_PARTITIONS = 3;
-    private static final int RECORD_TRANSFER_DURATION_MS = 5000;
-    private static final int CONNECTOR_SETUP_DURATION_MS = 15000;
+    private static final long RECORD_TRANSFER_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
     private static final int NUM_TASKS = 3;
     private static final int NUM_WORKERS = 3;
     private static final String CONNECTOR_NAME = "simple-conn";
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
index 06145de..ed6f6b9 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
@@ -47,12 +47,15 @@ public class MonitorableSinkConnector extends TestSinkConnector {
 
     private String connectorName;
     private Map<String, String> commonConfigs;
+    private ConnectorHandle connectorHandle;
 
     @Override
     public void start(Map<String, String> props) {
+        connectorHandle = RuntimeHandles.get().connectorHandle(props.get("name"));
         connectorName = props.get("name");
         commonConfigs = props;
         log.info("Starting connector {}", props.get("name"));
+        connectorHandle.recordConnectorStart();
     }
 
     @Override
@@ -74,6 +77,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
 
     @Override
     public void stop() {
+        connectorHandle.recordConnectorStop();
     }
 
     @Override
@@ -107,6 +111,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
             connectorName = props.get("connector.name");
             taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
             log.debug("Starting task {}", taskId);
+            taskHandle.recordTaskStart();
         }
 
         @Override
@@ -148,6 +153,7 @@ public class MonitorableSinkConnector extends TestSinkConnector {
 
         @Override
         public void stop() {
+            taskHandle.recordTaskStop();
         }
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
index 8bc8953..1d5e1ba 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSourceConnector.java
@@ -54,6 +54,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
         connectorName = connectorHandle.name();
         commonConfigs = props;
         log.info("Started {} connector {}", this.getClass().getSimpleName(), connectorName);
+        connectorHandle.recordConnectorStart();
     }
 
     @Override
@@ -76,6 +77,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
     @Override
     public void stop() {
         log.info("Stopped {} connector {}", this.getClass().getSimpleName(), connectorName);
+        connectorHandle.recordConnectorStop();
     }
 
     @Override
@@ -115,6 +117,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
             startingSeqno = Optional.ofNullable((Long) offset.get("saved")).orElse(0L);
             log.info("Started {} task {}", this.getClass().getSimpleName(), taskId);
             throttler = new ThroughputThrottler(throughput, System.currentTimeMillis());
+            taskHandle.recordTaskStart();
         }
 
         @Override
@@ -155,6 +158,7 @@ public class MonitorableSourceConnector extends TestSourceConnector {
         public void stop() {
             log.info("Stopped {} task {}", this.getClass().getSimpleName(), taskId);
             stopped = true;
+            taskHandle.recordTaskStop();
         }
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
new file mode 100644
index 0000000..9ddfa1f
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounter.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.kafka.common.utils.Time;
+
+public class StartAndStopCounter {
+
+    private final AtomicInteger startCounter = new AtomicInteger(0);
+    private final AtomicInteger stopCounter = new AtomicInteger(0);
+    private final List<StartAndStopLatch> restartLatches = new CopyOnWriteArrayList<>();
+    private final Time clock;
+
+    public StartAndStopCounter() {
+        this(Time.SYSTEM);
+    }
+
+    public StartAndStopCounter(Time clock) {
+        this.clock = clock != null ? clock : Time.SYSTEM;
+    }
+
+    /**
+     * Record a start.
+     */
+    public void recordStart() {
+        startCounter.incrementAndGet();
+        restartLatches.forEach(StartAndStopLatch::recordStart);
+    }
+
+    /**
+     * Record a stop.
+     */
+    public void recordStop() {
+        stopCounter.incrementAndGet();
+        restartLatches.forEach(StartAndStopLatch::recordStop);
+    }
+
+    /**
+     * Get the number of starts.
+     *
+     * @return the number of starts
+     */
+    public int starts() {
+        return startCounter.get();
+    }
+
+    /**
+     * Get the number of stops.
+     *
+     * @return the number of stops
+     */
+    public int stops() {
+        return stopCounter.get();
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of restarts
+     * has been completed.
+     *
+     * @param expectedStarts   the expected number of starts; may be 0
+     * @param expectedStops    the expected number of stops; may be 0
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedRestarts(int expectedStarts, int expectedStops) {
+        return expectedRestarts(expectedStarts, expectedStops, null);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of restarts
+     * has been completed.
+     *
+     * @param expectedStarts   the expected number of starts; may be 0
+     * @param expectedStops    the expected number of stops; may be 0
+     * @param dependents       any dependent latches that must also complete in order for the
+     *                         resulting latch to complete
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedRestarts(int expectedStarts, int expectedStops, List<StartAndStopLatch> dependents) {
+        StartAndStopLatch latch = new StartAndStopLatch(expectedStarts, expectedStops, this::remove, dependents, clock);
+        restartLatches.add(latch);
+        return latch;
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of restarts
+     * has been completed.
+     *
+     * @param expectedRestarts the expected number of restarts
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedRestarts(int expectedRestarts) {
+        return expectedRestarts(expectedRestarts, expectedRestarts);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of restarts
+     * has been completed.
+     *
+     * @param expectedRestarts the expected number of restarts
+     * @param dependents       any dependent latches that must also complete in order for the
+     *                         resulting latch to complete
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedRestarts(int expectedRestarts, List<StartAndStopLatch> dependents) {
+        return expectedRestarts(expectedRestarts, expectedRestarts, dependents);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of starts
+     * has been completed.
+     *
+     * @param expectedStarts the expected number of starts
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedStarts(int expectedStarts) {
+        return expectedRestarts(expectedStarts, 0);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of starts
+     * has been completed.
+     *
+     * @param expectedStarts the expected number of starts
+     * @param dependents     any dependent latches that must also complete in order for the
+     *                       resulting latch to complete
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedStarts(int expectedStarts, List<StartAndStopLatch> dependents) {
+        return expectedRestarts(expectedStarts, 0, dependents);
+    }
+
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of
+     * stops has been completed.
+     *
+     * @param expectedStops the expected number of stops
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedStops(int expectedStops) {
+        return expectedRestarts(0, expectedStops);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until the expected number of
+     * stops has been completed.
+     *
+     * @param expectedStops the expected number of stops
+     * @param dependents    any dependent latches that must also complete in order for the
+     *                      resulting latch to complete
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedStops(int expectedStops, List<StartAndStopLatch> dependents) {
+        return expectedRestarts(0, expectedStops, dependents);
+    }
+
+    protected void remove(StartAndStopLatch restartLatch) {
+        restartLatches.remove(restartLatch);
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterTest.java
new file mode 100644
index 0000000..7820a6d
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopCounterTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StartAndStopCounterTest {
+
+    private StartAndStopCounter counter;
+    private Time clock;
+    private ExecutorService waiters;
+    private StartAndStopLatch latch;
+
+    @Before
+    public void setup() {
+        clock = new MockTime();
+        counter = new StartAndStopCounter(clock);
+    }
+
+    @After
+    public void teardown() {
+        if (waiters != null) {
+            try {
+                waiters.shutdownNow();
+            } finally {
+                waiters = null;
+            }
+        }
+    }
+
+    @Test
+    public void shouldRecordStarts() {
+        assertEquals(0, counter.starts());
+        counter.recordStart();
+        assertEquals(1, counter.starts());
+        counter.recordStart();
+        assertEquals(2, counter.starts());
+        assertEquals(2, counter.starts());
+    }
+
+    @Test
+    public void shouldRecordStops() {
+        assertEquals(0, counter.stops());
+        counter.recordStop();
+        assertEquals(1, counter.stops());
+        counter.recordStop();
+        assertEquals(2, counter.stops());
+        assertEquals(2, counter.stops());
+    }
+
+    @Test
+    public void shouldExpectRestarts() throws Exception {
+        waiters = Executors.newSingleThreadExecutor();
+
+        latch = counter.expectedRestarts(1);
+        Future<Boolean> future = asyncAwait(100, TimeUnit.MILLISECONDS);
+
+        clock.sleep(1000);
+        counter.recordStop();
+        counter.recordStart();
+        assertTrue(future.get(200, TimeUnit.MILLISECONDS));
+        assertTrue(future.isDone());
+    }
+    @Test
+    public void shouldFailToWaitForRestartThatNeverHappens() throws Exception {
+        waiters = Executors.newSingleThreadExecutor();
+
+        latch = counter.expectedRestarts(1);
+        Future<Boolean> future = asyncAwait(100, TimeUnit.MILLISECONDS);
+
+        clock.sleep(1000);
+        // Record a stop but NOT a start
+        counter.recordStop();
+        assertFalse(future.get(200, TimeUnit.MILLISECONDS));
+        assertTrue(future.isDone());
+    }
+
+    private Future<Boolean> asyncAwait(long duration, TimeUnit unit) {
+        return waiters.submit(() -> {
+            try {
+                return latch.await(duration, unit);
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                return false;
+            }
+        });
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatch.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatch.java
new file mode 100644
index 0000000..b77007c
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatch.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.apache.kafka.common.utils.Time;
+
+/**
+ * A latch that can be used to count down the number of times a connector and/or tasks have
+ * been started and stopped.
+ */
+public class StartAndStopLatch {
+    private final CountDownLatch startLatch;
+    private final CountDownLatch stopLatch;
+    private final List<StartAndStopLatch> dependents;
+    private final Consumer<StartAndStopLatch> uponCompletion;
+    private final Time clock;
+
+    StartAndStopLatch(int expectedStarts, int expectedStops, Consumer<StartAndStopLatch> uponCompletion,
+                 List<StartAndStopLatch> dependents, Time clock) {
+        this.startLatch = new CountDownLatch(expectedStarts < 0 ? 0 : expectedStarts);
+        this.stopLatch = new CountDownLatch(expectedStops < 0 ? 0 : expectedStops);
+        this.dependents = dependents;
+        this.uponCompletion = uponCompletion;
+        this.clock = clock;
+    }
+
+    protected void recordStart() {
+        startLatch.countDown();
+    }
+
+    protected void recordStop() {
+        stopLatch.countDown();
+    }
+
+    /**
+     * Causes the current thread to wait until the latch has counted down the starts and
+     * stops to zero, unless the thread is {@linkplain Thread#interrupt interrupted},
+     * or the specified waiting time elapses.
+     *
+     * <p>If the current counts are zero then this method returns immediately
+     * with the value {@code true}.
+     *
+     * <p>If the current count is greater than zero then the current
+     * thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of three things happen:
+     * <ul>
+     * <li>The counts reach zero due to invocations of the {@link #recordStart()} and
+     * {@link #recordStop()} methods; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If the count reaches zero then the method returns with the
+     * value {@code true}.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.
+     *
+     * @param timeout the maximum time to wait
+     * @param unit    the time unit of the {@code timeout} argument
+     * @return {@code true} if the counts reached zero and {@code false}
+     *         if the waiting time elapsed before the counts reached zero
+     * @throws InterruptedException if the current thread is interrupted
+     *         while waiting
+     */
+    public boolean await(long timeout, TimeUnit unit) throws InterruptedException {
+        final long start = clock.milliseconds();
+        final long end = start + unit.toMillis(timeout);
+        if (!startLatch.await(end - start, TimeUnit.MILLISECONDS)) {
+            return false;
+        }
+        if (!stopLatch.await(end - clock.milliseconds(), TimeUnit.MILLISECONDS)) {
+            return false;
+        }
+
+        if (dependents != null) {
+            for (StartAndStopLatch dependent : dependents) {
+                if (!dependent.await(end - clock.milliseconds(), TimeUnit.MILLISECONDS)) {
+                    return false;
+                }
+            }
+        }
+        if (uponCompletion != null) {
+            uponCompletion.accept(this);
+        }
+        return true;
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java
new file mode 100644
index 0000000..d2732ea
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/StartAndStopLatchTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.integration;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class StartAndStopLatchTest {
+
+    private Time clock;
+    private StartAndStopLatch latch;
+    private List<StartAndStopLatch> dependents;
+    private AtomicBoolean completed = new AtomicBoolean();
+    private ExecutorService waiters;
+    private Future<Boolean> future;
+
+    @Before
+    public void setup() {
+        clock = new MockTime();
+        waiters = Executors.newSingleThreadExecutor();
+    }
+
+    @After
+    public void teardown() {
+        if (waiters != null) {
+            waiters.shutdownNow();
+        }
+    }
+
+    @Test
+    public void shouldReturnFalseWhenAwaitingForStartToNeverComplete() throws Throwable {
+        latch = new StartAndStopLatch(1, 1, this::complete, dependents, clock);
+        future = asyncAwait(100);
+        clock.sleep(10);
+        assertFalse(future.get(200, TimeUnit.MILLISECONDS));
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void shouldReturnFalseWhenAwaitingForStopToNeverComplete() throws Throwable {
+        latch = new StartAndStopLatch(1, 1, this::complete, dependents, clock);
+        future = asyncAwait(100);
+        latch.recordStart();
+        clock.sleep(10);
+        assertFalse(future.get(200, TimeUnit.MILLISECONDS));
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void shouldReturnTrueWhenAwaitingForStartAndStopToComplete() throws Throwable {
+        latch = new StartAndStopLatch(1, 1, this::complete, dependents, clock);
+        future = asyncAwait(100);
+        latch.recordStart();
+        latch.recordStop();
+        clock.sleep(10);
+        assertTrue(future.get(200, TimeUnit.MILLISECONDS));
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void shouldReturnFalseWhenAwaitingForDependentLatchToComplete() throws Throwable {
+        StartAndStopLatch depLatch = new StartAndStopLatch(1, 1, this::complete, null, clock);
+        dependents = Collections.singletonList(depLatch);
+        latch = new StartAndStopLatch(1, 1, this::complete, dependents, clock);
+
+        future = asyncAwait(100);
+        latch.recordStart();
+        latch.recordStop();
+        clock.sleep(10);
+        assertFalse(future.get(200, TimeUnit.MILLISECONDS));
+        assertTrue(future.isDone());
+    }
+
+    @Test
+    public void shouldReturnTrueWhenAwaitingForStartAndStopAndDependentLatch() throws Throwable {
+        StartAndStopLatch depLatch = new StartAndStopLatch(1, 1, this::complete, null, clock);
+        dependents = Collections.singletonList(depLatch);
+        latch = new StartAndStopLatch(1, 1, this::complete, dependents, clock);
+
+        future = asyncAwait(100);
+        latch.recordStart();
+        latch.recordStop();
+        depLatch.recordStart();
+        depLatch.recordStop();
+        clock.sleep(10);
+        assertTrue(future.get(200, TimeUnit.MILLISECONDS));
+        assertTrue(future.isDone());
+    }
+
+    private Future<Boolean> asyncAwait(long duration) {
+        return asyncAwait(duration, TimeUnit.MILLISECONDS);
+    }
+
+    private Future<Boolean> asyncAwait(long duration, TimeUnit unit) {
+        return waiters.submit(() -> {
+            try {
+                return latch.await(duration, unit);
+            } catch (InterruptedException e) {
+                Thread.interrupted();
+                return false;
+            }
+        });
+    }
+
+    private void complete(StartAndStopLatch latch) {
+        completed.set(true);
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
index 6081ea3..1159cb8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
@@ -36,6 +36,7 @@ public class TaskHandle {
     private final String taskId;
     private final ConnectorHandle connectorHandle;
     private final AtomicInteger partitionsAssigned = new AtomicInteger(0);
+    private final StartAndStopCounter startAndStopCounter = new StartAndStopCounter();
 
     private CountDownLatch recordsRemainingLatch;
     private CountDownLatch recordsToCommitLatch;
@@ -129,21 +130,33 @@ public class TaskHandle {
     }
 
     /**
-     * Wait for this task to meet the expected number of records as defined by {@code
-     * expectedRecords}.
+     * Wait up to the specified number of milliseconds for this task to meet the expected number of
+     * records as defined by {@code expectedRecords}.
      *
-     * @param  timeout duration to wait for records
+     * @param timeoutMillis number of milliseconds to wait for records
      * @throws InterruptedException if another threads interrupts this one while waiting for records
      */
-    public void awaitRecords(int timeout) throws InterruptedException {
+    public void awaitRecords(long timeoutMillis) throws InterruptedException {
+        awaitRecords(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Wait up to the specified timeout for this task to meet the expected number of records as
+     * defined by {@code expectedRecords}.
+     *
+     * @param timeout duration to wait for records
+     * @param unit    the unit of duration; may not be null
+     * @throws InterruptedException if another threads interrupts this one while waiting for records
+     */
+    public void awaitRecords(long timeout, TimeUnit unit) throws InterruptedException {
         if (recordsRemainingLatch == null) {
             throw new IllegalStateException("Illegal state encountered. expectedRecords() was not set for this task?");
         }
-        if (!recordsRemainingLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+        if (!recordsRemainingLatch.await(timeout, unit)) {
             String msg = String.format(
                     "Insufficient records seen by task %s in %d millis. Records expected=%d, actual=%d",
                     taskId,
-                    timeout,
+                    unit.toMillis(timeout),
                     expectedRecords,
                     expectedRecords - recordsRemainingLatch.getCount());
             throw new DataException(msg);
@@ -153,21 +166,33 @@ public class TaskHandle {
     }
 
     /**
-     * Wait for this task to meet the expected number of commits as defined by {@code
-     * expectedCommits}.
+     * Wait up to the specified timeout in milliseconds for this task to meet the expected number
+     * of commits as defined by {@code expectedCommits}.
      *
-     * @param  timeout duration to wait for commits
+     * @param timeoutMillis number of milliseconds to wait for commits
      * @throws InterruptedException if another threads interrupts this one while waiting for commits
      */
-    public void awaitCommits(int timeout) throws InterruptedException {
+    public void awaitCommits(long timeoutMillis) throws InterruptedException {
+        awaitCommits(timeoutMillis, TimeUnit.MILLISECONDS);
+    }
+
+    /**
+     * Wait up to the specified timeout for this task to meet the expected number of commits as
+     * defined by {@code expectedCommits}.
+     *
+     * @param timeout duration to wait for commits
+     * @param unit    the unit of duration; may not be null
+     * @throws InterruptedException if another threads interrupts this one while waiting for commits
+     */
+    public void awaitCommits(long timeout, TimeUnit unit) throws InterruptedException {
         if (recordsToCommitLatch == null) {
             throw new IllegalStateException("Illegal state encountered. expectedRecords() was not set for this task?");
         }
-        if (!recordsToCommitLatch.await(timeout, TimeUnit.MILLISECONDS)) {
+        if (!recordsToCommitLatch.await(timeout, unit)) {
             String msg = String.format(
                     "Insufficient records seen by task %s in %d millis. Records expected=%d, actual=%d",
                     taskId,
-                    timeout,
+                    unit.toMillis(timeout),
                     expectedCommits,
                     expectedCommits - recordsToCommitLatch.getCount());
             throw new DataException(msg);
@@ -176,6 +201,42 @@ public class TaskHandle {
                   taskId, expectedCommits - recordsToCommitLatch.getCount(), expectedCommits);
     }
 
+    /**
+     * Record that this task has been stopped. This should be called by the task.
+     */
+    public void recordTaskStart() {
+        startAndStopCounter.recordStart();
+    }
+
+    /**
+     * Record that this task has been stopped. This should be called by the task.
+     */
+    public void recordTaskStop() {
+        startAndStopCounter.recordStop();
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until this task has completed the
+     * expected number of starts.
+     *
+     * @param expectedStarts    the expected number of starts
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedStarts(int expectedStarts) {
+        return startAndStopCounter.expectedStarts(expectedStarts);
+    }
+
+    /**
+     * Obtain a {@link StartAndStopLatch} that can be used to wait until this task has completed the
+     * expected number of starts.
+     *
+     * @param expectedStops    the expected number of stops
+     * @return the latch; never null
+     */
+    public StartAndStopLatch expectedStops(int expectedStops) {
+        return startAndStopCounter.expectedStops(expectedStops);
+    }
+
     @Override
     public String toString() {
         return "Handle{" +