You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/11 23:51:03 UTC

[kafka] branch 2.5 updated: MINOR: Start using Response and replace IOException in EmbeddedConnectCluster for failures (#8055)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.5 by this push:
     new 23ef0ae  MINOR: Start using Response and replace IOException in EmbeddedConnectCluster for failures (#8055)
23ef0ae is described below

commit 23ef0aefe866c6812bc9ae5044b6da4d7dec95f2
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Tue Feb 11 12:49:26 2020 -0800

    MINOR: Start using Response and replace IOException in EmbeddedConnectCluster for failures (#8055)
    
    Changed `EmbeddedConnectCluster` to add utility methods that return `Response`, throw `ConnectException` instead of `IOException` for failures, and deprecate the old methods that returned primitive types rather than `Response`.
    
    Also introduce common assertions for embedded clusters under `EmbeddedConnectClusterAssertions`.
    
    Author: Konstantine Karantasis <ko...@confluent.io>
    Reviewer: Randall Hauch <rh...@gmail.com>, Chia-Ping Tsai <ch...@gmail.com>
---
 .../integration/ConnectWorkerIntegrationTest.java  | 124 ++------
 .../RebalanceSourceConnectorsIntegrationTest.java  | 145 +++-------
 .../integration/RestExtensionIntegrationTest.java  |   9 +-
 .../SessionedProtocolIntegrationTest.java          |   8 +-
 .../util/clusters/EmbeddedConnectCluster.java      | 317 ++++++++++++++-------
 .../clusters/EmbeddedConnectClusterAssertions.java | 246 ++++++++++++++++
 .../util/clusters/EmbeddedKafkaCluster.java        |  25 +-
 7 files changed, 561 insertions(+), 313 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 4a70466..6cc43a4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -16,9 +16,7 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.WorkerHandle;
@@ -35,7 +33,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -48,7 +45,6 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
@@ -60,8 +56,6 @@ public class ConnectWorkerIntegrationTest {
     private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
 
     private static final int NUM_TOPIC_PARTITIONS = 3;
-    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
-    private static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
     private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
     private static final int NUM_WORKERS = 3;
     private static final String CONNECTOR_NAME = "simple-source";
@@ -118,30 +112,30 @@ public class ConnectWorkerIntegrationTest {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
-        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
-                WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
 
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
-        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+                "Connector tasks did not start in time.");
 
         WorkerHandle extraWorker = connect.addWorker();
 
-        waitForCondition(() -> assertWorkersUp(NUM_WORKERS + 1).orElse(false),
-                WORKER_SETUP_DURATION_MS, "Expanded group of workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS + 1,
+                "Expanded group of workers did not start in time.");
 
-        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+                "Connector tasks are not all in running state.");
 
         Set<WorkerHandle> workers = connect.activeWorkers();
         assertTrue(workers.contains(extraWorker));
 
         connect.removeWorker(extraWorker);
 
-        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false) && !assertWorkersUp(NUM_WORKERS + 1).orElse(false),
-                WORKER_SETUP_DURATION_MS, "Group of workers did not shrink in time.");
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Group of workers did not shrink in time.");
 
         workers = connect.activeWorkers();
         assertFalse(workers.contains(extraWorker));
@@ -164,14 +158,14 @@ public class ConnectWorkerIntegrationTest {
         connectorProps.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
         connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress");
 
-        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
-                WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
 
         // Try to start the connector and its single task.
         connect.configureConnector(CONNECTOR_NAME, connectorProps);
 
-        waitForCondition(() -> assertConnectorTasksFailed(CONNECTOR_NAME, numTasks).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not fail in time");
+        connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, numTasks,
+                "Connector tasks did not fail in time");
 
         // Reconfigure the connector without the bad broker address.
         connectorProps.remove(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG);
@@ -180,11 +174,11 @@ public class ConnectWorkerIntegrationTest {
         // Restart the failed task
         String taskRestartEndpoint = connect.endpointForResource(
             String.format("connectors/%s/tasks/0/restart", CONNECTOR_NAME));
-        connect.executePost(taskRestartEndpoint, "", Collections.emptyMap());
+        connect.requestPost(taskRestartEndpoint, "", Collections.emptyMap());
 
         // Ensure the task started successfully this time
-        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
-            CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+            "Connector tasks are not all in running state.");
     }
 
     /**
@@ -210,19 +204,19 @@ public class ConnectWorkerIntegrationTest {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
-        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
-                WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Initial group of workers did not start in time.");
 
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
-        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+                "Connector tasks did not start in time.");
 
         connect.kafka().stopOnlyKafka();
 
-        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
-                WORKER_SETUP_DURATION_MS, "Group of workers did not remain the same after broker shutdown");
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Group of workers did not remain the same after broker shutdown");
 
         // Allow for the workers to discover that the coordinator is unavailable, wait is
         // heartbeat timeout * 2 + 4sec
@@ -233,78 +227,14 @@ public class ConnectWorkerIntegrationTest {
         // Allow for the kafka brokers to come back online
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
 
-        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
-                WORKER_SETUP_DURATION_MS, "Group of workers did not remain the same within the "
-                        + "designated time.");
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Group of workers did not remain the same within the designated time.");
 
         // Allow for the workers to rebalance and reach a steady state
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
 
-        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+                "Connector tasks did not start in time.");
     }
 
-    /**
-     * Confirm that the requested number of workers is up and running.
-     *
-     * @param numWorkers the number of online workers
-     * @return true if at least {@code numWorkers} are up; false otherwise
-     */
-    private Optional<Boolean> assertWorkersUp(int numWorkers) {
-        try {
-            int numUp = connect.activeWorkers().size();
-            return Optional.of(numUp >= numWorkers);
-        } catch (Exception e) {
-            log.error("Could not check active workers.", e);
-            return Optional.empty();
-        }
-    }
-
-    /**
-     * Confirm that a connector with an exact number of tasks is running.
-     *
-     * @param connectorName the connector
-     * @param numTasks the expected number of tasks
-     * @return true if the connector and tasks are in RUNNING state; false otherwise
-     */
-    private Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
-        return assertConnectorState(
-            connectorName,
-            AbstractStatus.State.RUNNING,
-            numTasks,
-            AbstractStatus.State.RUNNING);
-    }
-
-    /**
-     * Confirm that a connector is running, that it has a specific number of tasks, and that all of
-     * its tasks are in the FAILED state.
-     * @param connectorName the connector
-     * @param numTasks the expected number of tasks
-     * @return true if the connector is in RUNNING state and its tasks are in FAILED state; false otherwise
-     */
-    private Optional<Boolean> assertConnectorTasksFailed(String connectorName, int numTasks) {
-        return assertConnectorState(
-            connectorName,
-            AbstractStatus.State.RUNNING,
-            numTasks,
-            AbstractStatus.State.FAILED);
-    }
-
-    private Optional<Boolean> assertConnectorState(
-        String connectorName,
-        AbstractStatus.State connectorState,
-        int numTasks,
-        AbstractStatus.State tasksState) {
-        try {
-            ConnectorStateInfo info = connect.connectorStatus(connectorName);
-            boolean result = info != null
-                    && info.connector().state().equals(connectorState.toString())
-                    && info.tasks().size() == numTasks
-                    && info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString()));
-            return Optional.of(result);
-        } catch (Exception e) {
-            log.error("Could not check connector state info.", e);
-            return Optional.empty();
-        }
-    }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index d9ff223..073b307 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -16,8 +16,6 @@
  */
 package org.apache.kafka.connect.integration;
 
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.storage.StringConverter;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@@ -35,7 +33,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
@@ -65,6 +62,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
     private static final int NUM_TOPIC_PARTITIONS = 3;
     private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
     private static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final int NUM_WORKERS = 3;
     private static final int NUM_TASKS = 4;
     private static final String CONNECTOR_NAME = "seq-source1";
     private static final String TOPIC_NAME = "sequential-topic";
@@ -85,7 +83,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
-                .numWorkers(3)
+                .numWorkers(NUM_WORKERS)
                 .numBrokers(1)
                 .workerProps(workerProps)
                 .brokerProps(brokerProps)
@@ -117,20 +115,23 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
+
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         // start a source connector
         connect.configureConnector("another-source", props);
 
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
-        waitForCondition(() -> this.assertConnectorAndTasksRunning("another-source", 4).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("another-source", 4,
+                "Connector tasks did not start in time.");
     }
 
     @Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
@@ -153,11 +154,14 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
+
         // start a source connector
         connect.configureConnector(CONNECTOR_NAME, props);
 
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         int numRecordsProduced = 100;
         long recordTransferDurationMs = TimeUnit.SECONDS.toMillis(30);
@@ -180,8 +184,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
                 restartLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
 
         // And wait for the Connect to show the connectors and tasks are running
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         // consume all records from the source topic or fail, to ensure that they were correctly produced
         recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, anotherTopic).count();
@@ -205,27 +209,20 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
-        waitForCondition(() -> this.assertWorkersUp(3),
-                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
 
-        // start a source connector
-        IntStream.range(0, 4).forEachOrdered(
-            i -> {
-                try {
-                    connect.configureConnector(CONNECTOR_NAME + i, props);
-                } catch (IOException e) {
-                    throw new ConnectException(e);
-                }
-            });
-
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(true),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        // start several source connectors
+        IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         // delete connector
         connect.deleteConnector(CONNECTOR_NAME + 3);
 
-        waitForCondition(() -> !this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(true),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3,
+                "Connector tasks did not stop in time.");
 
         waitForCondition(this::assertConnectorAndTasksAreUnique,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
@@ -246,29 +243,22 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
-        waitForCondition(() -> this.assertWorkersUp(3),
-                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
 
         // start a source connector
-        IntStream.range(0, 4).forEachOrdered(
-            i -> {
-                try {
-                    connect.configureConnector(CONNECTOR_NAME + i, props);
-                } catch (IOException e) {
-                    throw new ConnectException(e);
-                }
-            });
-
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         connect.addWorker();
 
-        waitForCondition(() -> this.assertWorkersUp(4),
-                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+        connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS + 1,
+                "Connect workers did not start in time.");
 
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         waitForCondition(this::assertConnectorAndTasksAreUnique,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
@@ -289,69 +279,24 @@ public class RebalanceSourceConnectorsIntegrationTest {
         props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
         props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
 
-        waitForCondition(() -> this.assertWorkersUp(3),
-                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+                "Connect workers did not start in time.");
 
         // start a source connector
-        IntStream.range(0, 4).forEachOrdered(
-            i -> {
-                try {
-                    connect.configureConnector(CONNECTOR_NAME + i, props);
-                } catch (IOException e) {
-                    throw new ConnectException(e);
-                }
-            });
-
-        waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
-                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+        IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
+
+        connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+                "Connector tasks did not start in time.");
 
         connect.removeWorker();
 
-        waitForCondition(() -> this.assertWorkersUp(2),
-                WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
+                "Connect workers did not start in time.");
 
         waitForCondition(this::assertConnectorAndTasksAreUnique,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
     }
 
-    /**
-     * Confirm that a connector with an exact number of tasks is running.
-     *
-     * @param connectorName the connector
-     * @param numTasks the expected number of tasks
-     * @return true if the connector and tasks are in RUNNING state; false otherwise
-     */
-    private Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
-        try {
-            ConnectorStateInfo info = connect.connectorStatus(connectorName);
-            boolean result = info != null
-                    && info.tasks().size() == numTasks
-                    && info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
-                    && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
-            log.debug("Found connector and tasks running: {}", result);
-            return Optional.of(result);
-        } catch (Exception e) {
-            log.error("Could not check connector state info.", e);
-            return Optional.empty();
-        }
-    }
-
-    /**
-     * Verifies whether the supplied number of workers matches the number of workers
-     * currently running.
-     * @param numWorkers the expected number of active workers
-     * @return true if exactly numWorkers are active; false if more or fewer workers are running
-     */
-    private boolean assertWorkersUp(int numWorkers) {
-        try {
-            int numUp = connect.activeWorkers().size();
-            return numUp == numWorkers;
-        } catch (Exception e) {
-            log.error("Could not check active workers.", e);
-            return false;
-        }
-    }
-
     private boolean assertConnectorAndTasksAreUnique() {
         try {
             Map<String, Collection<String>> connectors = new HashMap<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index 087246b..d5328a6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.integration;
 
+import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.health.ConnectClusterState;
 import org.apache.kafka.connect.health.ConnectorHealth;
 import org.apache.kafka.connect.health.ConnectorState;
@@ -23,7 +24,6 @@ import org.apache.kafka.connect.health.ConnectorType;
 import org.apache.kafka.connect.health.TaskState;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
-import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
 import org.apache.kafka.connect.util.clusters.WorkerHandle;
 import org.apache.kafka.test.IntegrationTest;
@@ -33,12 +33,14 @@ import org.junit.experimental.categories.Category;
 
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
@@ -138,8 +140,9 @@ public class RestExtensionIntegrationTest {
     private boolean extensionIsRegistered() {
         try {
             String extensionUrl = connect.endpointForResource("integration-test-rest-extension/registered");
-            return "true".equals(connect.executeGet(extensionUrl));
-        } catch (ConnectRestException | IOException e) {
+            Response response = connect.requestGet(extensionUrl);
+            return response.getStatus() < BAD_REQUEST.getStatusCode();
+        } catch (ConnectException e) {
             return false;
         }
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
index d01ab3b..1f13c17 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
@@ -110,7 +110,7 @@ public class SessionedProtocolIntegrationTest {
         );
         assertEquals(
             BAD_REQUEST.getStatusCode(),
-            connect.executePost(connectorTasksEndpoint, "[]", emptyHeaders)
+            connect.requestPost(connectorTasksEndpoint, "[]", emptyHeaders).getStatus()
         );
 
         // Try again, but with an invalid signature
@@ -121,7 +121,7 @@ public class SessionedProtocolIntegrationTest {
         );
         assertEquals(
             FORBIDDEN.getStatusCode(),
-            connect.executePost(connectorTasksEndpoint, "[]", invalidSignatureHeaders)
+            connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus()
         );
 
         // Create the connector now
@@ -151,7 +151,7 @@ public class SessionedProtocolIntegrationTest {
         );
         assertEquals(
             BAD_REQUEST.getStatusCode(),
-            connect.executePost(connectorTasksEndpoint, "[]", emptyHeaders)
+            connect.requestPost(connectorTasksEndpoint, "[]", emptyHeaders).getStatus()
         );
 
         // Try again, but with an invalid signature
@@ -162,7 +162,7 @@ public class SessionedProtocolIntegrationTest {
         );
         assertEquals(
             FORBIDDEN.getStatusCode(),
-            connect.executePost(connectorTasksEndpoint, "[]", invalidSignatureHeaders)
+            connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus()
         );
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 94cc880..2276340 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -25,7 +25,6 @@ import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.Response;
 import java.io.IOException;
 import java.io.InputStream;
@@ -33,6 +32,7 @@ import java.io.OutputStreamWriter;
 import java.net.HttpURLConnection;
 import java.net.URL;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -82,6 +82,7 @@ public class EmbeddedConnectCluster {
     private final boolean maskExitProcedures;
     private final String workerNamePrefix;
     private final AtomicInteger nextWorkerId = new AtomicInteger(0);
+    private final EmbeddedConnectClusterAssertions assertions;
 
     private EmbeddedConnectCluster(String name, Map<String, String> workerProps, int numWorkers,
                                    int numBrokers, Properties brokerProps,
@@ -95,6 +96,7 @@ public class EmbeddedConnectCluster {
         this.maskExitProcedures = maskExitProcedures;
         // leaving non-configurable for now
         this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX;
+        this.assertions = new EmbeddedConnectClusterAssertions(this);
     }
 
     /**
@@ -124,7 +126,7 @@ public class EmbeddedConnectCluster {
     /**
      * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
      */
-    public void start() throws IOException {
+    public void start() {
         if (maskExitProcedures) {
             Exit.setExitProcedure(exitProcedure);
             Exit.setHaltProcedure(haltProcedure);
@@ -136,6 +138,8 @@ public class EmbeddedConnectCluster {
     /**
      * Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
      * Clean up any temp directories created locally.
+     *
+     * @throws RuntimeException if Kafka brokers fail to stop
      */
     public void stop() {
         connectCluster.forEach(this::stopWorker);
@@ -182,6 +186,7 @@ public class EmbeddedConnectCluster {
      * Decommission a specific worker from this Connect cluster.
      *
      * @param worker the handle of the worker to remove from the cluster
+     * @throws IllegalStateException if the Connect cluster has no workers
      */
     public void removeWorker(WorkerHandle worker) {
         if (connectCluster.isEmpty()) {
@@ -238,9 +243,9 @@ public class EmbeddedConnectCluster {
                 .filter(w -> {
                     try {
                         mapper.readerFor(ServerInfo.class)
-                                .readValue(executeGet(w.url().toString()));
+                                .readValue(responseToString(requestGet(w.url().toString())));
                         return true;
-                    } catch (IOException e) {
+                    } catch (ConnectException | IOException e) {
                         // Worker failed to respond. Consider it's offline
                         return false;
                     }
@@ -263,36 +268,37 @@ public class EmbeddedConnectCluster {
      *
      * @param connName   the name of the connector
      * @param connConfig the intended configuration
-     * @throws IOException          if call to the REST api fails.
-     * @throws ConnectRestException if REST api returns error status
+     * @throws ConnectRestException if the REST api returns error status
+     * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
      */
-    public void configureConnector(String connName, Map<String, String> connConfig) throws IOException {
+    public String configureConnector(String connName, Map<String, String> connConfig) {
         String url = endpointForResource(String.format("connectors/%s/config", connName));
         ObjectMapper mapper = new ObjectMapper();
-        int status;
+        String content;
         try {
-            String content = mapper.writeValueAsString(connConfig);
-            status = executePut(url, content);
+            content = mapper.writeValueAsString(connConfig);
         } catch (IOException e) {
-            log.error("Could not execute PUT request to " + url, e);
-            throw e;
+            throw new ConnectException("Could not serialize connector configuration and execute PUT request");
         }
-        if (status >= HttpServletResponse.SC_BAD_REQUEST) {
-            throw new ConnectRestException(status, "Could not execute PUT request");
+        Response response = requestPut(url, content);
+        if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+            return responseToString(response);
         }
+        throw new ConnectRestException(response.getStatus(), "Could not execute PUT request");
     }
 
     /**
      * Delete an existing connector.
      *
      * @param connName name of the connector to be deleted
-     * @throws IOException if call to the REST api fails.
+     * @throws ConnectRestException if the REST api returns error status
+     * @throws ConnectException for any other error.
      */
-    public void deleteConnector(String connName) throws IOException {
+    public void deleteConnector(String connName) {
         String url = endpointForResource(String.format("connectors/%s", connName));
-        int status = executeDelete(url);
-        if (status >= HttpServletResponse.SC_BAD_REQUEST) {
-            throw new ConnectRestException(status, "Could not execute DELETE request.");
+        Response response = requestDelete(url);
+        if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
+            throw new ConnectRestException(response.getStatus(), "Could not execute DELETE request.");
         }
     }
 
@@ -305,50 +311,78 @@ public class EmbeddedConnectCluster {
      */
     public Collection<String> connectors() {
         ObjectMapper mapper = new ObjectMapper();
-        try {
-            String url = endpointForResource("connectors");
-            return mapper.readerFor(Collection.class).readValue(executeGet(url));
-        } catch (IOException e) {
-            log.error("Could not read connector list", e);
-            throw new ConnectException("Could not read connector list", e);
+        String url = endpointForResource("connectors");
+        Response response = requestGet(url);
+        if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+            try {
+                return mapper.readerFor(Collection.class).readValue(responseToString(response));
+            } catch (IOException e) {
+                log.error("Could not parse connector list from response: {}",
+                        responseToString(response), e
+                );
+                throw new ConnectException("Could not not parse connector list", e);
+            }
         }
+        throw new ConnectRestException(response.getStatus(),
+                "Could not read connector list. Error response: " + responseToString(response));
     }
 
     /**
      * Get the status for a connector running in this cluster.
      *
      * @param connectorName name of the connector
-     * @return an instance of {@link ConnectorStateInfo} populated with state informaton of the connector and it's tasks.
+     * @return an instance of {@link ConnectorStateInfo} populated with state information of the connector and its tasks.
      * @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code.
      * @throws ConnectException for any other error.
      */
     public ConnectorStateInfo connectorStatus(String connectorName) {
         ObjectMapper mapper = new ObjectMapper();
+        String url = endpointForResource(String.format("connectors/%s/status", connectorName));
+        Response response = requestGet(url);
         try {
-            String url = endpointForResource(String.format("connectors/%s/status", connectorName));
-            return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url));
+            if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+                return mapper.readerFor(ConnectorStateInfo.class)
+                        .readValue(responseToString(response));
+            }
         } catch (IOException e) {
-            log.error("Could not read connector state", e);
-            throw new ConnectException("Could not read connector state", e);
+            log.error("Could not read connector state from response: {}",
+                    responseToString(response), e);
+            throw new ConnectException("Could not not parse connector state", e);
         }
+        throw new ConnectRestException(response.getStatus(),
+                "Could not read connector state. Error response: " + responseToString(response));
     }
 
-    public String adminEndpoint(String resource) throws IOException {
+    /**
+     * Get the full URL of the admin endpoint that corresponds to the given REST resource
+     *
+     * @param resource the resource under the worker's admin endpoint
+     * @return the admin endpoint URL
+     * @throws ConnectRestException if no admin REST endpoint is available
+     */
+    public String adminEndpoint(String resource) {
         String url = connectCluster.stream()
                 .map(WorkerHandle::adminUrl)
                 .filter(Objects::nonNull)
                 .findFirst()
-                .orElseThrow(() -> new IOException("Admin endpoint is disabled."))
+                .orElseThrow(() -> new ConnectException("Admin endpoint is disabled."))
                 .toString();
         return url + resource;
     }
 
-    public String endpointForResource(String resource) throws IOException {
+    /**
+     * Get the full URL of the endpoint that corresponds to the given REST resource
+     *
+     * @param resource the resource under the worker's admin endpoint
+     * @return the admin endpoint URL
+     * @throws ConnectRestException if no REST endpoint is available
+     */
+    public String endpointForResource(String resource) {
         String url = connectCluster.stream()
                 .map(WorkerHandle::url)
                 .filter(Objects::nonNull)
                 .findFirst()
-                .orElseThrow(() -> new IOException("Connect workers have not been provisioned"))
+                .orElseThrow(() -> new ConnectException("Connect workers have not been provisioned"))
                 .toString();
         return url + resource;
     }
@@ -359,91 +393,161 @@ public class EmbeddedConnectCluster {
         }
     }
 
+    /**
+     * Return the handle to the Kafka cluster this Connect cluster connects to.
+     *
+     * @return the Kafka cluster handle
+     */
     public EmbeddedKafkaCluster kafka() {
         return kafkaCluster;
     }
 
-    public int executePut(String url, String body) throws IOException {
-        log.debug("Executing PUT request to URL={}. Payload={}", url, body);
-        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
-        httpCon.setDoOutput(true);
-        httpCon.setRequestProperty("Content-Type", "application/json");
-        httpCon.setRequestMethod("PUT");
-        try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
-            out.write(body);
-        }
-        if (httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST) {
-            try (InputStream is = httpCon.getInputStream()) {
-                log.info("PUT response for URL={} is {}", url, responseToString(is));
-            }
-        } else {
-            try (InputStream is = httpCon.getErrorStream()) {
-                log.info("PUT error response for URL={} is {}", url, responseToString(is));
-            }
-        }
-        return httpCon.getResponseCode();
+    /**
+     * Execute a GET request on the given URL.
+     *
+     * @param url the HTTP endpoint
+     * @return the response to the GET request
+     * @throws ConnectException if execution of the GET request fails
+     * @deprecated Use {@link #requestGet(String)} instead.
+     */
+    @Deprecated
+    public String executeGet(String url) {
+        return responseToString(requestGet(url));
     }
 
-    public int executePost(String url, String body, Map<String, String> headers) throws IOException {
-        log.debug("Executing POST request to URL={}. Payload={}", url, body);
-        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
-        httpCon.setDoOutput(true);
-        httpCon.setRequestProperty("Content-Type", "application/json");
-        headers.forEach(httpCon::setRequestProperty);
-        httpCon.setRequestMethod("POST");
-        try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
-            out.write(body);
-        }
-        if (httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST) {
-            try (InputStream is = httpCon.getInputStream()) {
-                log.info("POST response for URL={} is {}", url, responseToString(is));
-            }
-        } else {
-            try (InputStream is = httpCon.getErrorStream()) {
-                log.info("POST error response for URL={} is {}", url, responseToString(is));
-            }
-        }
-        return httpCon.getResponseCode();
+    /**
+     * Execute a GET request on the given URL.
+     *
+     * @param url the HTTP endpoint
+     * @return the response to the GET request
+     * @throws ConnectException if execution of the GET request fails
+     */
+    public Response requestGet(String url) {
+        return requestHttpMethod(url, null, Collections.emptyMap(), "GET");
     }
 
     /**
-     * Execute a GET request on the given URL.
+     * Execute a PUT request on the given URL.
+     *
+     * @param url the HTTP endpoint
+     * @param body the payload of the PUT request
+     * @return the response to the PUT request
+     * @throws ConnectException if execution of the PUT request fails
+     * @deprecated Use {@link #requestPut(String, String)} instead.
+     */
+    @Deprecated
+    public int executePut(String url, String body) {
+        return requestPut(url, body).getStatus();
+    }
+
+    /**
+     * Execute a PUT request on the given URL.
+     *
+     * @param url the HTTP endpoint
+     * @param body the payload of the PUT request
+     * @return the response to the PUT request
+     * @throws ConnectException if execution of the PUT request fails
+     */
+    public Response requestPut(String url, String body) {
+        return requestHttpMethod(url, body, Collections.emptyMap(), "PUT");
+    }
+
+    /**
+     * Execute a POST request on the given URL.
+     *
+     * @param url the HTTP endpoint
+     * @param body the payload of the POST request
+     * @param headers a map that stores the POST request headers
+     * @return the response to the POST request
+     * @throws ConnectException if execution of the POST request fails
+     * @deprecated Use {@link #requestPost(String, String, java.util.Map)} instead.
+     */
+    @Deprecated
+    public int executePost(String url, String body, Map<String, String> headers) {
+        return requestPost(url, body, headers).getStatus();
+    }
+
+    /**
+     * Execute a POST request on the given URL.
+     *
+     * @param url the HTTP endpoint
+     * @param body the payload of the POST request
+     * @param headers a map that stores the POST request headers
+     * @return the response to the POST request
+     * @throws ConnectException if execution of the POST request fails
+     */
+    public Response requestPost(String url, String body, Map<String, String> headers) {
+        return requestHttpMethod(url, body, headers, "POST");
+    }
+
+    /**
+     * Execute a DELETE request on the given URL.
      *
      * @param url the HTTP endpoint
-     * @return response body encoded as a String
-     * @throws ConnectRestException if the HTTP request fails with a valid status code
-     * @throws IOException for any other I/O error.
+     * @return the response to the DELETE request
+     * @throws ConnectException if execution of the DELETE request fails
+     * @deprecated Use {@link #requestDelete(String)} instead.
      */
-    public String executeGet(String url) throws IOException {
-        log.debug("Executing GET request to URL={}.", url);
-        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
-        httpCon.setDoOutput(true);
-        httpCon.setRequestMethod("GET");
-        try (InputStream is = httpCon.getInputStream()) {
-            int c;
-            StringBuilder response = new StringBuilder();
-            while ((c = is.read()) != -1) {
-                response.append((char) c);
+    @Deprecated
+    public int executeDelete(String url) {
+        return requestDelete(url).getStatus();
+    }
+
+    /**
+     * Execute a DELETE request on the given URL.
+     *
+     * @param url the HTTP endpoint
+     * @return the response to the DELETE request
+     * @throws ConnectException if execution of the DELETE request fails
+     */
+    public Response requestDelete(String url) {
+        return requestHttpMethod(url, null, Collections.emptyMap(), "DELETE");
+    }
+
+    /**
+     * A general method that executes an HTTP request on a given URL.
+     *
+     * @param url the HTTP endpoint
+     * @param body the payload of the request; null if there isn't one
+     * @param headers a map that stores the request headers; empty if there are no headers
+     * @param httpMethod the name of the HTTP method to execute
+     * @return the response to the HTTP request
+     * @throws ConnectException if execution of the HTTP method fails
+     */
+    protected Response requestHttpMethod(String url, String body, Map<String, String> headers,
+                                      String httpMethod) {
+        log.debug("Executing {} request to URL={}." + (body != null ? " Payload={}" : ""),
+                httpMethod, url, body);
+        try {
+            HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+            httpCon.setDoOutput(true);
+            httpCon.setRequestMethod(httpMethod);
+            if (body != null) {
+                httpCon.setRequestProperty("Content-Type", "application/json");
+                headers.forEach(httpCon::setRequestProperty);
+                try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
+                    out.write(body);
+                }
             }
-            log.debug("GET response for URL={} is {}", url, response);
-            return response.toString();
-        } catch (IOException e) {
-            Response.Status status = Response.Status.fromStatusCode(httpCon.getResponseCode());
-            if (status != null) {
-                throw new ConnectRestException(status, "Invalid endpoint: " + url, e);
+            try (InputStream is = httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST
+                                  ? httpCon.getInputStream()
+                                  : httpCon.getErrorStream()
+            ) {
+                String responseEntity = responseToString(is);
+                log.info("{} response for URL={} is {}",
+                        httpMethod, url, responseEntity.isEmpty() ? "empty" : responseEntity);
+                return Response.status(Response.Status.fromStatusCode(httpCon.getResponseCode()))
+                        .entity(responseEntity)
+                        .build();
             }
-            // invalid response code, re-throw the IOException.
-            throw e;
+        } catch (IOException e) {
+            log.error("Could not execute " + httpMethod + " request to " + url, e);
+            throw new ConnectException(e);
         }
     }
 
-    public int executeDelete(String url) throws IOException {
-        log.debug("Executing DELETE request to URL={}", url);
-        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
-        httpCon.setDoOutput(true);
-        httpCon.setRequestMethod("DELETE");
-        httpCon.connect();
-        return httpCon.getResponseCode();
+    private String responseToString(Response response) {
+        return response == null ? "empty" : (String) response.getEntity();
     }
 
     private String responseToString(InputStream stream) throws IOException {
@@ -511,4 +615,13 @@ public class EmbeddedConnectCluster {
         }
     }
 
+    /**
+     * Return the available assertions for this Connect cluster
+     *
+     * @return the assertions object
+     */
+    public EmbeddedConnectClusterAssertions assertions() {
+        return assertions;
+    }
+
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
new file mode 100644
index 0000000..4b13d89
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+/**
+ * A set of common assertions that can be applied to a Connect cluster during integration testing
+ */
+public class EmbeddedConnectClusterAssertions {
+
+    private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
+    public static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+    private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+
+    private final EmbeddedConnectCluster connect;
+
+    EmbeddedConnectClusterAssertions(EmbeddedConnectCluster connect) {
+        this.connect = connect;
+    }
+
+    /**
+     * Assert that at least the requested number of workers are up and running.
+     *
+     * @param numWorkers the number of online workers
+     */
+    public void assertAtLeastNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException {
+        try {
+            waitForCondition(
+                () -> checkWorkersUp(numWorkers, (actual, expected) -> actual >= expected).orElse(false),
+                WORKER_SETUP_DURATION_MS,
+                "Didn't meet the minimum requested number of online workers: " + numWorkers);
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
+    /**
+     * Assert that at least the requested number of workers are up and running.
+     *
+     * @param numWorkers the number of online workers
+     */
+    public void assertExactlyNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException {
+        try {
+            waitForCondition(
+                () -> checkWorkersUp(numWorkers, (actual, expected) -> actual == expected).orElse(false),
+                WORKER_SETUP_DURATION_MS,
+                "Didn't meet the exact requested number of online workers: " + numWorkers);
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
+    /**
+     * Confirm that the requested number of workers are up and running.
+     *
+     * @param numWorkers the number of online workers
+     * @return true if at least {@code numWorkers} are up; false otherwise
+     */
+    protected Optional<Boolean> checkWorkersUp(int numWorkers, BiFunction<Integer, Integer, Boolean> comp) {
+        try {
+            int numUp = connect.activeWorkers().size();
+            return Optional.of(comp.apply(numUp, numWorkers));
+        } catch (Exception e) {
+            log.error("Could not check active workers.", e);
+            return Optional.empty();
+        }
+    }
+
+    /**
+     * Assert that a connector is running with at least the given number of tasks all in running state
+     *
+     * @param connectorName the connector name
+     * @param numTasks the number of tasks
+     * @param detailMessage
+     * @throws InterruptedException
+     */
+    public void assertConnectorAndAtLeastNumTasksAreRunning(String connectorName, int numTasks, String detailMessage)
+            throws InterruptedException {
+        try {
+            waitForCondition(
+                () -> checkConnectorState(
+                    connectorName,
+                    AbstractStatus.State.RUNNING,
+                    numTasks,
+                    AbstractStatus.State.RUNNING,
+                    (actual, expected) -> actual >= expected
+                ).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS,
+                "The connector or at least " + numTasks + " of tasks are not running.");
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
+    /**
+     * Assert that a connector is running with at least the given number of tasks all in running state
+     *
+     * @param connectorName the connector name
+     * @param numTasks the number of tasks
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorAndExactlyNumTasksAreRunning(String connectorName, int numTasks, String detailMessage)
+            throws InterruptedException {
+        try {
+            waitForCondition(
+                () -> checkConnectorState(
+                    connectorName,
+                    AbstractStatus.State.RUNNING,
+                    numTasks,
+                    AbstractStatus.State.RUNNING,
+                    (actual, expected) -> actual == expected
+                ).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS,
+                "The connector or exactly " + numTasks + " tasks are not running.");
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
+    /**
+     * Assert that a connector is running, that it has a specific number of tasks, and that all of
+     * its tasks are in the FAILED state.
+     *
+     * @param connectorName the connector name
+     * @param numTasks the number of tasks
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorIsRunningAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage)
+            throws InterruptedException {
+        try {
+            waitForCondition(
+                () -> checkConnectorState(
+                    connectorName,
+                    AbstractStatus.State.RUNNING,
+                    numTasks,
+                    AbstractStatus.State.FAILED,
+                    (actual, expected) -> actual >= expected
+                ).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS,
+                "Either the connector is not running or not all the " + numTasks + " tasks have failed.");
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
+    /**
+     * Assert that a connector and its tasks are not running.
+     *
+     * @param connectorName the connector name
+     * @param detailMessage the assertion message
+     * @throws InterruptedException
+     */
+    public void assertConnectorAndTasksAreStopped(String connectorName, String detailMessage)
+            throws InterruptedException {
+        try {
+            waitForCondition(
+                () -> checkConnectorAndTasksAreStopped(connectorName),
+                CONNECTOR_SETUP_DURATION_MS,
+                "At least the connector or one of its tasks is still");
+        } catch (AssertionError e) {
+            throw new AssertionError(detailMessage, e);
+        }
+    }
+
+    /**
+     * Check whether the connector or any of its tasks are still in RUNNING state
+     *
+     * @param connectorName the connector
+     * @return true if the connector and all the tasks are not in RUNNING state; false otherwise
+     */
+    protected boolean checkConnectorAndTasksAreStopped(String connectorName) {
+        ConnectorStateInfo info;
+        try {
+            info = connect.connectorStatus(connectorName);
+        } catch (ConnectRestException e) {
+            return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
+        } catch (Exception e) {
+            log.error("Could not check connector state info.", e);
+            return false;
+        }
+        if (info == null) {
+            return true;
+        }
+        return !info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+                && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+    }
+
+    /**
+     * Check whether the given connector state matches the current state of the connector and
+     * whether it has at least the given number of tasks, with all the tasks matching the given
+     * task state.
+     * @param connectorName the connector
+     * @param connectorState
+     * @param numTasks the expected number of tasks
+     * @param tasksState
+     * @return true if the connector and tasks are in RUNNING state; false otherwise
+     */
+    protected Optional<Boolean> checkConnectorState(
+            String connectorName,
+            AbstractStatus.State connectorState,
+            int numTasks,
+            AbstractStatus.State tasksState,
+            BiFunction<Integer, Integer, Boolean> comp
+    ) {
+        try {
+            ConnectorStateInfo info = connect.connectorStatus(connectorName);
+            boolean result = info != null
+                    && comp.apply(info.tasks().size(), numTasks)
+                    && info.connector().state().equals(connectorState.toString())
+                    && info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString()));
+            return Optional.of(result);
+        } catch (Exception e) {
+            log.error("Could not check connector state info.", e);
+            return Optional.empty();
+        }
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index d36ae5b..b365e65 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -97,7 +97,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
     }
 
     @Override
-    protected void before() throws IOException {
+    protected void before() {
         start();
     }
 
@@ -106,11 +106,17 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         stop();
     }
 
-    public void startOnlyKafkaOnSamePorts() throws IOException {
+    /**
+     * Starts the Kafka cluster alone using the ports that were assigned during initialization of
+     * the harness.
+     *
+     * @throws ConnectException if a directory to store the data cannot be created
+     */
+    public void startOnlyKafkaOnSamePorts() {
         start(currentBrokerPorts, currentBrokerLogDirs);
     }
 
-    private void start() throws IOException {
+    private void start() {
         // pick a random port
         zookeeper = new EmbeddedZookeeper();
         Arrays.fill(currentBrokerPorts, 0);
@@ -118,7 +124,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         start(currentBrokerPorts, currentBrokerLogDirs);
     }
 
-    private void start(int[] brokerPorts, String[] logDirs) throws IOException {
+    private void start(int[] brokerPorts, String[] logDirs) {
         brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
 
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
@@ -205,10 +211,15 @@ public class EmbeddedKafkaCluster extends ExternalResource {
         }
     }
 
-    private String createLogDir() throws IOException {
+    private String createLogDir() {
         TemporaryFolder tmpFolder = new TemporaryFolder();
-        tmpFolder.create();
-        return tmpFolder.newFolder().getAbsolutePath();
+        try {
+            tmpFolder.create();
+            return tmpFolder.newFolder().getAbsolutePath();
+        } catch (IOException e) {
+            log.error("Unable to create temporary log directory", e);
+            throw new ConnectException("Unable to create temporary log directory", e);
+        }
     }
 
     public String bootstrapServers() {