You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/11 23:51:03 UTC
[kafka] branch 2.5 updated: MINOR: Start using Response and replace
IOException in EmbeddedConnectCluster for failures (#8055)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 23ef0ae MINOR: Start using Response and replace IOException in EmbeddedConnectCluster for failures (#8055)
23ef0ae is described below
commit 23ef0aefe866c6812bc9ae5044b6da4d7dec95f2
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Tue Feb 11 12:49:26 2020 -0800
MINOR: Start using Response and replace IOException in EmbeddedConnectCluster for failures (#8055)
Changed `EmbeddedConnectCluster` to add utility methods that return `Response`, throw `ConnectException` instead of `IOException` for failures, and deprecate the old methods that returned primitive types rather than `Response`.
Also introduce common assertions for embedded clusters under `EmbeddedConnectClusterAssertions`.
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>, Chia-Ping Tsai <ch...@gmail.com>
---
.../integration/ConnectWorkerIntegrationTest.java | 124 ++------
.../RebalanceSourceConnectorsIntegrationTest.java | 145 +++-------
.../integration/RestExtensionIntegrationTest.java | 9 +-
.../SessionedProtocolIntegrationTest.java | 8 +-
.../util/clusters/EmbeddedConnectCluster.java | 317 ++++++++++++++-------
.../clusters/EmbeddedConnectClusterAssertions.java | 246 ++++++++++++++++
.../util/clusters/EmbeddedKafkaCluster.java | 25 +-
7 files changed, 561 insertions(+), 313 deletions(-)
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 4a70466..6cc43a4 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -16,9 +16,7 @@
*/
package org.apache.kafka.connect.integration;
-import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
@@ -35,7 +33,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
-import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
@@ -48,7 +45,6 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -60,8 +56,6 @@ public class ConnectWorkerIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ConnectWorkerIntegrationTest.class);
private static final int NUM_TOPIC_PARTITIONS = 3;
- private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
- private static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
private static final long OFFSET_COMMIT_INTERVAL_MS = TimeUnit.SECONDS.toMillis(30);
private static final int NUM_WORKERS = 3;
private static final String CONNECTOR_NAME = "simple-source";
@@ -118,30 +112,30 @@ public class ConnectWorkerIntegrationTest {
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
- waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
- WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
- waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+ "Connector tasks did not start in time.");
WorkerHandle extraWorker = connect.addWorker();
- waitForCondition(() -> assertWorkersUp(NUM_WORKERS + 1).orElse(false),
- WORKER_SETUP_DURATION_MS, "Expanded group of workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS + 1,
+ "Expanded group of workers did not start in time.");
- waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+ "Connector tasks are not all in running state.");
Set<WorkerHandle> workers = connect.activeWorkers();
assertTrue(workers.contains(extraWorker));
connect.removeWorker(extraWorker);
- waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false) && !assertWorkersUp(NUM_WORKERS + 1).orElse(false),
- WORKER_SETUP_DURATION_MS, "Group of workers did not shrink in time.");
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+ "Group of workers did not shrink in time.");
workers = connect.activeWorkers();
assertFalse(workers.contains(extraWorker));
@@ -164,14 +158,14 @@ public class ConnectWorkerIntegrationTest {
connectorProps.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress");
- waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
- WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
// Try to start the connector and its single task.
connect.configureConnector(CONNECTOR_NAME, connectorProps);
- waitForCondition(() -> assertConnectorTasksFailed(CONNECTOR_NAME, numTasks).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not fail in time");
+ connect.assertions().assertConnectorIsRunningAndTasksHaveFailed(CONNECTOR_NAME, numTasks,
+ "Connector tasks did not fail in time");
// Reconfigure the connector without the bad broker address.
connectorProps.remove(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG);
@@ -180,11 +174,11 @@ public class ConnectWorkerIntegrationTest {
// Restart the failed task
String taskRestartEndpoint = connect.endpointForResource(
String.format("connectors/%s/tasks/0/restart", CONNECTOR_NAME));
- connect.executePost(taskRestartEndpoint, "", Collections.emptyMap());
+ connect.requestPost(taskRestartEndpoint, "", Collections.emptyMap());
// Ensure the task started successfully this time
- waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+ "Connector tasks are not all in running state.");
}
/**
@@ -210,19 +204,19 @@ public class ConnectWorkerIntegrationTest {
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
- waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
- WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
- waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+ "Connector tasks did not start in time.");
connect.kafka().stopOnlyKafka();
- waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
- WORKER_SETUP_DURATION_MS, "Group of workers did not remain the same after broker shutdown");
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+ "Group of workers did not remain the same after broker shutdown");
// Allow for the workers to discover that the coordinator is unavailable, wait is
// heartbeat timeout * 2 + 4sec
@@ -233,78 +227,14 @@ public class ConnectWorkerIntegrationTest {
// Allow for the kafka brokers to come back online
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
- waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
- WORKER_SETUP_DURATION_MS, "Group of workers did not remain the same within the "
- + "designated time.");
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+ "Group of workers did not remain the same within the designated time.");
// Allow for the workers to rebalance and reach a steady state
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
- waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
+ "Connector tasks did not start in time.");
}
- /**
- * Confirm that the requested number of workers is up and running.
- *
- * @param numWorkers the number of online workers
- * @return true if at least {@code numWorkers} are up; false otherwise
- */
- private Optional<Boolean> assertWorkersUp(int numWorkers) {
- try {
- int numUp = connect.activeWorkers().size();
- return Optional.of(numUp >= numWorkers);
- } catch (Exception e) {
- log.error("Could not check active workers.", e);
- return Optional.empty();
- }
- }
-
- /**
- * Confirm that a connector with an exact number of tasks is running.
- *
- * @param connectorName the connector
- * @param numTasks the expected number of tasks
- * @return true if the connector and tasks are in RUNNING state; false otherwise
- */
- private Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
- return assertConnectorState(
- connectorName,
- AbstractStatus.State.RUNNING,
- numTasks,
- AbstractStatus.State.RUNNING);
- }
-
- /**
- * Confirm that a connector is running, that it has a specific number of tasks, and that all of
- * its tasks are in the FAILED state.
- * @param connectorName the connector
- * @param numTasks the expected number of tasks
- * @return true if the connector is in RUNNING state and its tasks are in FAILED state; false otherwise
- */
- private Optional<Boolean> assertConnectorTasksFailed(String connectorName, int numTasks) {
- return assertConnectorState(
- connectorName,
- AbstractStatus.State.RUNNING,
- numTasks,
- AbstractStatus.State.FAILED);
- }
-
- private Optional<Boolean> assertConnectorState(
- String connectorName,
- AbstractStatus.State connectorState,
- int numTasks,
- AbstractStatus.State tasksState) {
- try {
- ConnectorStateInfo info = connect.connectorStatus(connectorName);
- boolean result = info != null
- && info.connector().state().equals(connectorState.toString())
- && info.tasks().size() == numTasks
- && info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString()));
- return Optional.of(result);
- } catch (Exception e) {
- log.error("Could not check connector state info.", e);
- return Optional.empty();
- }
- }
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index d9ff223..073b307 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -16,8 +16,6 @@
*/
package org.apache.kafka.connect.integration;
-import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.runtime.AbstractStatus;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.storage.StringConverter;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
@@ -35,7 +33,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
-import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
@@ -65,6 +62,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
private static final int NUM_TOPIC_PARTITIONS = 3;
private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
private static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+ private static final int NUM_WORKERS = 3;
private static final int NUM_TASKS = 4;
private static final String CONNECTOR_NAME = "seq-source1";
private static final String TOPIC_NAME = "sequential-topic";
@@ -85,7 +83,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
- .numWorkers(3)
+ .numWorkers(NUM_WORKERS)
.numBrokers(1)
.workerProps(workerProps)
.brokerProps(brokerProps)
@@ -117,20 +115,23 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Connect workers did not start in time.");
+
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
// start a source connector
connect.configureConnector("another-source", props);
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
- waitForCondition(() -> this.assertConnectorAndTasksRunning("another-source", 4).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning("another-source", 4,
+ "Connector tasks did not start in time.");
}
@Ignore("Flaky and disruptive. See KAFKA-8391, KAFKA-8661 for details.")
@@ -153,11 +154,14 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Connect workers did not start in time.");
+
// start a source connector
connect.configureConnector(CONNECTOR_NAME, props);
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
int numRecordsProduced = 100;
long recordTransferDurationMs = TimeUnit.SECONDS.toMillis(30);
@@ -180,8 +184,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
restartLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
// And wait for the Connect to show the connectors and tasks are running
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME, NUM_TASKS).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
// consume all records from the source topic or fail, to ensure that they were correctly produced
recordNum = connect.kafka().consume(numRecordsProduced, recordTransferDurationMs, anotherTopic).count();
@@ -205,27 +209,20 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
- waitForCondition(() -> this.assertWorkersUp(3),
- WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Connect workers did not start in time.");
- // start a source connector
- IntStream.range(0, 4).forEachOrdered(
- i -> {
- try {
- connect.configureConnector(CONNECTOR_NAME + i, props);
- } catch (IOException e) {
- throw new ConnectException(e);
- }
- });
-
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(true),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ // start several source connectors
+ IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+ "Connector tasks did not start in time.");
// delete connector
connect.deleteConnector(CONNECTOR_NAME + 3);
- waitForCondition(() -> !this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(true),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME + 3,
+ "Connector tasks did not stop in time.");
waitForCondition(this::assertConnectorAndTasksAreUnique,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
@@ -246,29 +243,22 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
- waitForCondition(() -> this.assertWorkersUp(3),
- WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Connect workers did not start in time.");
// start a source connector
- IntStream.range(0, 4).forEachOrdered(
- i -> {
- try {
- connect.configureConnector(CONNECTOR_NAME + i, props);
- } catch (IOException e) {
- throw new ConnectException(e);
- }
- });
-
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+ "Connector tasks did not start in time.");
connect.addWorker();
- waitForCondition(() -> this.assertWorkersUp(4),
- WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS + 1,
+ "Connect workers did not start in time.");
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+ "Connector tasks did not start in time.");
waitForCondition(this::assertConnectorAndTasksAreUnique,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
@@ -289,69 +279,24 @@ public class RebalanceSourceConnectorsIntegrationTest {
props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
- waitForCondition(() -> this.assertWorkersUp(3),
- WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
+ "Connect workers did not start in time.");
// start a source connector
- IntStream.range(0, 4).forEachOrdered(
- i -> {
- try {
- connect.configureConnector(CONNECTOR_NAME + i, props);
- } catch (IOException e) {
- throw new ConnectException(e);
- }
- });
-
- waitForCondition(() -> this.assertConnectorAndTasksRunning(CONNECTOR_NAME + 3, NUM_TASKS).orElse(false),
- CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not start in time.");
+ IntStream.range(0, 4).forEachOrdered(i -> connect.configureConnector(CONNECTOR_NAME + i, props));
+
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME + 3, NUM_TASKS,
+ "Connector tasks did not start in time.");
connect.removeWorker();
- waitForCondition(() -> this.assertWorkersUp(2),
- WORKER_SETUP_DURATION_MS, "Connect workers did not start in time.");
+ connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS - 1,
+ "Connect workers did not start in time.");
waitForCondition(this::assertConnectorAndTasksAreUnique,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
}
- /**
- * Confirm that a connector with an exact number of tasks is running.
- *
- * @param connectorName the connector
- * @param numTasks the expected number of tasks
- * @return true if the connector and tasks are in RUNNING state; false otherwise
- */
- private Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
- try {
- ConnectorStateInfo info = connect.connectorStatus(connectorName);
- boolean result = info != null
- && info.tasks().size() == numTasks
- && info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
- && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
- log.debug("Found connector and tasks running: {}", result);
- return Optional.of(result);
- } catch (Exception e) {
- log.error("Could not check connector state info.", e);
- return Optional.empty();
- }
- }
-
- /**
- * Verifies whether the supplied number of workers matches the number of workers
- * currently running.
- * @param numWorkers the expected number of active workers
- * @return true if exactly numWorkers are active; false if more or fewer workers are running
- */
- private boolean assertWorkersUp(int numWorkers) {
- try {
- int numUp = connect.activeWorkers().size();
- return numUp == numWorkers;
- } catch (Exception e) {
- log.error("Could not check active workers.", e);
- return false;
- }
- }
-
private boolean assertConnectorAndTasksAreUnique() {
try {
Map<String, Collection<String>> connectors = new HashMap<>();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index 087246b..d5328a6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.integration;
+import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.health.ConnectClusterState;
import org.apache.kafka.connect.health.ConnectorHealth;
import org.apache.kafka.connect.health.ConnectorState;
@@ -23,7 +24,6 @@ import org.apache.kafka.connect.health.ConnectorType;
import org.apache.kafka.connect.health.TaskState;
import org.apache.kafka.connect.rest.ConnectRestExtension;
import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
-import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
import org.apache.kafka.connect.util.clusters.WorkerHandle;
import org.apache.kafka.test.IntegrationTest;
@@ -33,12 +33,14 @@ import org.junit.experimental.categories.Category;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
+import javax.ws.rs.core.Response;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
+import static javax.ws.rs.core.Response.Status.BAD_REQUEST;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.NAME_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
@@ -138,8 +140,9 @@ public class RestExtensionIntegrationTest {
private boolean extensionIsRegistered() {
try {
String extensionUrl = connect.endpointForResource("integration-test-rest-extension/registered");
- return "true".equals(connect.executeGet(extensionUrl));
- } catch (ConnectRestException | IOException e) {
+ Response response = connect.requestGet(extensionUrl);
+ return response.getStatus() < BAD_REQUEST.getStatusCode();
+ } catch (ConnectException e) {
return false;
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
index d01ab3b..1f13c17 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
@@ -110,7 +110,7 @@ public class SessionedProtocolIntegrationTest {
);
assertEquals(
BAD_REQUEST.getStatusCode(),
- connect.executePost(connectorTasksEndpoint, "[]", emptyHeaders)
+ connect.requestPost(connectorTasksEndpoint, "[]", emptyHeaders).getStatus()
);
// Try again, but with an invalid signature
@@ -121,7 +121,7 @@ public class SessionedProtocolIntegrationTest {
);
assertEquals(
FORBIDDEN.getStatusCode(),
- connect.executePost(connectorTasksEndpoint, "[]", invalidSignatureHeaders)
+ connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus()
);
// Create the connector now
@@ -151,7 +151,7 @@ public class SessionedProtocolIntegrationTest {
);
assertEquals(
BAD_REQUEST.getStatusCode(),
- connect.executePost(connectorTasksEndpoint, "[]", emptyHeaders)
+ connect.requestPost(connectorTasksEndpoint, "[]", emptyHeaders).getStatus()
);
// Try again, but with an invalid signature
@@ -162,7 +162,7 @@ public class SessionedProtocolIntegrationTest {
);
assertEquals(
FORBIDDEN.getStatusCode(),
- connect.executePost(connectorTasksEndpoint, "[]", invalidSignatureHeaders)
+ connect.requestPost(connectorTasksEndpoint, "[]", invalidSignatureHeaders).getStatus()
);
}
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 94cc880..2276340 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -25,7 +25,6 @@ import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.servlet.http.HttpServletResponse;
import javax.ws.rs.core.Response;
import java.io.IOException;
import java.io.InputStream;
@@ -33,6 +32,7 @@ import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -82,6 +82,7 @@ public class EmbeddedConnectCluster {
private final boolean maskExitProcedures;
private final String workerNamePrefix;
private final AtomicInteger nextWorkerId = new AtomicInteger(0);
+ private final EmbeddedConnectClusterAssertions assertions;
private EmbeddedConnectCluster(String name, Map<String, String> workerProps, int numWorkers,
int numBrokers, Properties brokerProps,
@@ -95,6 +96,7 @@ public class EmbeddedConnectCluster {
this.maskExitProcedures = maskExitProcedures;
// leaving non-configurable for now
this.workerNamePrefix = DEFAULT_WORKER_NAME_PREFIX;
+ this.assertions = new EmbeddedConnectClusterAssertions(this);
}
/**
@@ -124,7 +126,7 @@ public class EmbeddedConnectCluster {
/**
* Start the connect cluster and the embedded Kafka and Zookeeper cluster.
*/
- public void start() throws IOException {
+ public void start() {
if (maskExitProcedures) {
Exit.setExitProcedure(exitProcedure);
Exit.setHaltProcedure(haltProcedure);
@@ -136,6 +138,8 @@ public class EmbeddedConnectCluster {
/**
* Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
* Clean up any temp directories created locally.
+ *
+ * @throws RuntimeException if Kafka brokers fail to stop
*/
public void stop() {
connectCluster.forEach(this::stopWorker);
@@ -182,6 +186,7 @@ public class EmbeddedConnectCluster {
* Decommission a specific worker from this Connect cluster.
*
* @param worker the handle of the worker to remove from the cluster
+ * @throws IllegalStateException if the Connect cluster has no workers
*/
public void removeWorker(WorkerHandle worker) {
if (connectCluster.isEmpty()) {
@@ -238,9 +243,9 @@ public class EmbeddedConnectCluster {
.filter(w -> {
try {
mapper.readerFor(ServerInfo.class)
- .readValue(executeGet(w.url().toString()));
+ .readValue(responseToString(requestGet(w.url().toString())));
return true;
- } catch (IOException e) {
+ } catch (ConnectException | IOException e) {
// Worker failed to respond. Consider it's offline
return false;
}
@@ -263,36 +268,37 @@ public class EmbeddedConnectCluster {
*
* @param connName the name of the connector
* @param connConfig the intended configuration
- * @throws IOException if call to the REST api fails.
- * @throws ConnectRestException if REST api returns error status
+ * @throws ConnectRestException if the REST api returns error status
+ * @throws ConnectException if the configuration fails to be serialized or if the request could not be sent
*/
- public void configureConnector(String connName, Map<String, String> connConfig) throws IOException {
+ public String configureConnector(String connName, Map<String, String> connConfig) {
String url = endpointForResource(String.format("connectors/%s/config", connName));
ObjectMapper mapper = new ObjectMapper();
- int status;
+ String content;
try {
- String content = mapper.writeValueAsString(connConfig);
- status = executePut(url, content);
+ content = mapper.writeValueAsString(connConfig);
} catch (IOException e) {
- log.error("Could not execute PUT request to " + url, e);
- throw e;
+ throw new ConnectException("Could not serialize connector configuration and execute PUT request");
}
- if (status >= HttpServletResponse.SC_BAD_REQUEST) {
- throw new ConnectRestException(status, "Could not execute PUT request");
+ Response response = requestPut(url, content);
+ if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+ return responseToString(response);
}
+ throw new ConnectRestException(response.getStatus(), "Could not execute PUT request");
}
/**
* Delete an existing connector.
*
* @param connName name of the connector to be deleted
- * @throws IOException if call to the REST api fails.
+ * @throws ConnectRestException if the REST api returns error status
+ * @throws ConnectException for any other error.
*/
- public void deleteConnector(String connName) throws IOException {
+ public void deleteConnector(String connName) {
String url = endpointForResource(String.format("connectors/%s", connName));
- int status = executeDelete(url);
- if (status >= HttpServletResponse.SC_BAD_REQUEST) {
- throw new ConnectRestException(status, "Could not execute DELETE request.");
+ Response response = requestDelete(url);
+ if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
+ throw new ConnectRestException(response.getStatus(), "Could not execute DELETE request.");
}
}
@@ -305,50 +311,78 @@ public class EmbeddedConnectCluster {
*/
public Collection<String> connectors() {
ObjectMapper mapper = new ObjectMapper();
- try {
- String url = endpointForResource("connectors");
- return mapper.readerFor(Collection.class).readValue(executeGet(url));
- } catch (IOException e) {
- log.error("Could not read connector list", e);
- throw new ConnectException("Could not read connector list", e);
+ String url = endpointForResource("connectors");
+ Response response = requestGet(url);
+ if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+ try {
+ return mapper.readerFor(Collection.class).readValue(responseToString(response));
+ } catch (IOException e) {
+ log.error("Could not parse connector list from response: {}",
+ responseToString(response), e
+ );
+ throw new ConnectException("Could not not parse connector list", e);
+ }
}
+ throw new ConnectRestException(response.getStatus(),
+ "Could not read connector list. Error response: " + responseToString(response));
}
/**
* Get the status for a connector running in this cluster.
*
* @param connectorName name of the connector
- * @return an instance of {@link ConnectorStateInfo} populated with state informaton of the connector and it's tasks.
+ * @return an instance of {@link ConnectorStateInfo} populated with state information of the connector and its tasks.
* @throws ConnectRestException if the HTTP request to the REST API failed with a valid status code.
* @throws ConnectException for any other error.
*/
public ConnectorStateInfo connectorStatus(String connectorName) {
ObjectMapper mapper = new ObjectMapper();
+ String url = endpointForResource(String.format("connectors/%s/status", connectorName));
+ Response response = requestGet(url);
try {
- String url = endpointForResource(String.format("connectors/%s/status", connectorName));
- return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url));
+ if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
+ return mapper.readerFor(ConnectorStateInfo.class)
+ .readValue(responseToString(response));
+ }
} catch (IOException e) {
- log.error("Could not read connector state", e);
- throw new ConnectException("Could not read connector state", e);
+ log.error("Could not read connector state from response: {}",
+ responseToString(response), e);
+ throw new ConnectException("Could not not parse connector state", e);
}
+ throw new ConnectRestException(response.getStatus(),
+ "Could not read connector state. Error response: " + responseToString(response));
}
- public String adminEndpoint(String resource) throws IOException {
+ /**
+ * Get the full URL of the admin endpoint that corresponds to the given REST resource
+ *
+ * @param resource the resource under the worker's admin endpoint
+ * @return the admin endpoint URL
+ * @throws ConnectRestException if no admin REST endpoint is available
+ */
+ public String adminEndpoint(String resource) {
String url = connectCluster.stream()
.map(WorkerHandle::adminUrl)
.filter(Objects::nonNull)
.findFirst()
- .orElseThrow(() -> new IOException("Admin endpoint is disabled."))
+ .orElseThrow(() -> new ConnectException("Admin endpoint is disabled."))
.toString();
return url + resource;
}
- public String endpointForResource(String resource) throws IOException {
+ /**
+ * Get the full URL of the endpoint that corresponds to the given REST resource
+ *
+ * @param resource the resource under the worker's admin endpoint
+ * @return the admin endpoint URL
+ * @throws ConnectRestException if no REST endpoint is available
+ */
+ public String endpointForResource(String resource) {
String url = connectCluster.stream()
.map(WorkerHandle::url)
.filter(Objects::nonNull)
.findFirst()
- .orElseThrow(() -> new IOException("Connect workers have not been provisioned"))
+ .orElseThrow(() -> new ConnectException("Connect workers have not been provisioned"))
.toString();
return url + resource;
}
@@ -359,91 +393,161 @@ public class EmbeddedConnectCluster {
}
}
+ /**
+ * Return the handle to the Kafka cluster this Connect cluster connects to.
+ *
+ * @return the Kafka cluster handle
+ */
public EmbeddedKafkaCluster kafka() {
return kafkaCluster;
}
- public int executePut(String url, String body) throws IOException {
- log.debug("Executing PUT request to URL={}. Payload={}", url, body);
- HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
- httpCon.setDoOutput(true);
- httpCon.setRequestProperty("Content-Type", "application/json");
- httpCon.setRequestMethod("PUT");
- try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
- out.write(body);
- }
- if (httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST) {
- try (InputStream is = httpCon.getInputStream()) {
- log.info("PUT response for URL={} is {}", url, responseToString(is));
- }
- } else {
- try (InputStream is = httpCon.getErrorStream()) {
- log.info("PUT error response for URL={} is {}", url, responseToString(is));
- }
- }
- return httpCon.getResponseCode();
+ /**
+ * Execute a GET request on the given URL.
+ *
+ * @param url the HTTP endpoint
+ * @return the response to the GET request
+ * @throws ConnectException if execution of the GET request fails
+ * @deprecated Use {@link #requestGet(String)} instead.
+ */
+ @Deprecated
+ public String executeGet(String url) {
+ return responseToString(requestGet(url));
}
- public int executePost(String url, String body, Map<String, String> headers) throws IOException {
- log.debug("Executing POST request to URL={}. Payload={}", url, body);
- HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
- httpCon.setDoOutput(true);
- httpCon.setRequestProperty("Content-Type", "application/json");
- headers.forEach(httpCon::setRequestProperty);
- httpCon.setRequestMethod("POST");
- try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
- out.write(body);
- }
- if (httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST) {
- try (InputStream is = httpCon.getInputStream()) {
- log.info("POST response for URL={} is {}", url, responseToString(is));
- }
- } else {
- try (InputStream is = httpCon.getErrorStream()) {
- log.info("POST error response for URL={} is {}", url, responseToString(is));
- }
- }
- return httpCon.getResponseCode();
+ /**
+ * Execute a GET request on the given URL.
+ *
+ * @param url the HTTP endpoint
+ * @return the response to the GET request
+ * @throws ConnectException if execution of the GET request fails
+ */
+ public Response requestGet(String url) {
+ return requestHttpMethod(url, null, Collections.emptyMap(), "GET");
}
/**
- * Execute a GET request on the given URL.
+ * Execute a PUT request on the given URL.
+ *
+ * @param url the HTTP endpoint
+ * @param body the payload of the PUT request
+ * @return the response to the PUT request
+ * @throws ConnectException if execution of the PUT request fails
+ * @deprecated Use {@link #requestPut(String, String)} instead.
+ */
+ @Deprecated
+ public int executePut(String url, String body) {
+ return requestPut(url, body).getStatus();
+ }
+
+ /**
+ * Execute a PUT request on the given URL.
+ *
+ * @param url the HTTP endpoint
+ * @param body the payload of the PUT request
+ * @return the response to the PUT request
+ * @throws ConnectException if execution of the PUT request fails
+ */
+ public Response requestPut(String url, String body) {
+ return requestHttpMethod(url, body, Collections.emptyMap(), "PUT");
+ }
+
+ /**
+ * Execute a POST request on the given URL.
+ *
+ * @param url the HTTP endpoint
+ * @param body the payload of the POST request
+ * @param headers a map that stores the POST request headers
+ * @return the response to the POST request
+ * @throws ConnectException if execution of the POST request fails
+ * @deprecated Use {@link #requestPost(String, String, java.util.Map)} instead.
+ */
+ @Deprecated
+ public int executePost(String url, String body, Map<String, String> headers) {
+ return requestPost(url, body, headers).getStatus();
+ }
+
+ /**
+ * Execute a POST request on the given URL.
+ *
+ * @param url the HTTP endpoint
+ * @param body the payload of the POST request
+ * @param headers a map that stores the POST request headers
+ * @return the response to the POST request
+ * @throws ConnectException if execution of the POST request fails
+ */
+ public Response requestPost(String url, String body, Map<String, String> headers) {
+ return requestHttpMethod(url, body, headers, "POST");
+ }
+
+ /**
+ * Execute a DELETE request on the given URL.
*
* @param url the HTTP endpoint
- * @return response body encoded as a String
- * @throws ConnectRestException if the HTTP request fails with a valid status code
- * @throws IOException for any other I/O error.
+ * @return the response to the DELETE request
+ * @throws ConnectException if execution of the DELETE request fails
+ * @deprecated Use {@link #requestDelete(String)} instead.
*/
- public String executeGet(String url) throws IOException {
- log.debug("Executing GET request to URL={}.", url);
- HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
- httpCon.setDoOutput(true);
- httpCon.setRequestMethod("GET");
- try (InputStream is = httpCon.getInputStream()) {
- int c;
- StringBuilder response = new StringBuilder();
- while ((c = is.read()) != -1) {
- response.append((char) c);
+ @Deprecated
+ public int executeDelete(String url) {
+ return requestDelete(url).getStatus();
+ }
+
+ /**
+ * Execute a DELETE request on the given URL.
+ *
+ * @param url the HTTP endpoint
+ * @return the response to the DELETE request
+ * @throws ConnectException if execution of the DELETE request fails
+ */
+ public Response requestDelete(String url) {
+ return requestHttpMethod(url, null, Collections.emptyMap(), "DELETE");
+ }
+
+ /**
+ * A general method that executes an HTTP request on a given URL.
+ *
+ * @param url the HTTP endpoint
+ * @param body the payload of the request; null if there isn't one
+ * @param headers a map that stores the request headers; empty if there are no headers
+ * @param httpMethod the name of the HTTP method to execute
+ * @return the response to the HTTP request
+ * @throws ConnectException if execution of the HTTP method fails
+ */
+ protected Response requestHttpMethod(String url, String body, Map<String, String> headers,
+ String httpMethod) {
+ log.debug("Executing {} request to URL={}." + (body != null ? " Payload={}" : ""),
+ httpMethod, url, body);
+ try {
+ HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+ httpCon.setDoOutput(true);
+ httpCon.setRequestMethod(httpMethod);
+ if (body != null) {
+ httpCon.setRequestProperty("Content-Type", "application/json");
+ headers.forEach(httpCon::setRequestProperty);
+ try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
+ out.write(body);
+ }
}
- log.debug("GET response for URL={} is {}", url, response);
- return response.toString();
- } catch (IOException e) {
- Response.Status status = Response.Status.fromStatusCode(httpCon.getResponseCode());
- if (status != null) {
- throw new ConnectRestException(status, "Invalid endpoint: " + url, e);
+ try (InputStream is = httpCon.getResponseCode() < HttpURLConnection.HTTP_BAD_REQUEST
+ ? httpCon.getInputStream()
+ : httpCon.getErrorStream()
+ ) {
+ String responseEntity = responseToString(is);
+ log.info("{} response for URL={} is {}",
+ httpMethod, url, responseEntity.isEmpty() ? "empty" : responseEntity);
+ return Response.status(Response.Status.fromStatusCode(httpCon.getResponseCode()))
+ .entity(responseEntity)
+ .build();
}
- // invalid response code, re-throw the IOException.
- throw e;
+ } catch (IOException e) {
+ log.error("Could not execute " + httpMethod + " request to " + url, e);
+ throw new ConnectException(e);
}
}
- public int executeDelete(String url) throws IOException {
- log.debug("Executing DELETE request to URL={}", url);
- HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
- httpCon.setDoOutput(true);
- httpCon.setRequestMethod("DELETE");
- httpCon.connect();
- return httpCon.getResponseCode();
+ private String responseToString(Response response) {
+ return response == null ? "empty" : (String) response.getEntity();
}
private String responseToString(InputStream stream) throws IOException {
@@ -511,4 +615,13 @@ public class EmbeddedConnectCluster {
}
}
+ /**
+ * Return the available assertions for this Connect cluster
+ *
+ * @return the assertions object
+ */
+ public EmbeddedConnectClusterAssertions assertions() {
+ return assertions;
+ }
+
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
new file mode 100644
index 0000000..4b13d89
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import org.apache.kafka.connect.runtime.AbstractStatus;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import java.util.Optional;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
+
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+
+/**
+ * A set of common assertions that can be applied to a Connect cluster during integration testing
+ */
+public class EmbeddedConnectClusterAssertions {
+
+ private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
+ public static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
+ private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+
+ private final EmbeddedConnectCluster connect;
+
+ EmbeddedConnectClusterAssertions(EmbeddedConnectCluster connect) {
+ this.connect = connect;
+ }
+
+ /**
+ * Assert that at least the requested number of workers are up and running.
+ *
+ * @param numWorkers the number of online workers
+ */
+ public void assertAtLeastNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkWorkersUp(numWorkers, (actual, expected) -> actual >= expected).orElse(false),
+ WORKER_SETUP_DURATION_MS,
+ "Didn't meet the minimum requested number of online workers: " + numWorkers);
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
+ /**
+ * Assert that at least the requested number of workers are up and running.
+ *
+ * @param numWorkers the number of online workers
+ */
+ public void assertExactlyNumWorkersAreUp(int numWorkers, String detailMessage) throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkWorkersUp(numWorkers, (actual, expected) -> actual == expected).orElse(false),
+ WORKER_SETUP_DURATION_MS,
+ "Didn't meet the exact requested number of online workers: " + numWorkers);
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
+ /**
+ * Confirm that the requested number of workers are up and running.
+ *
+ * @param numWorkers the number of online workers
+ * @return true if at least {@code numWorkers} are up; false otherwise
+ */
+ protected Optional<Boolean> checkWorkersUp(int numWorkers, BiFunction<Integer, Integer, Boolean> comp) {
+ try {
+ int numUp = connect.activeWorkers().size();
+ return Optional.of(comp.apply(numUp, numWorkers));
+ } catch (Exception e) {
+ log.error("Could not check active workers.", e);
+ return Optional.empty();
+ }
+ }
+
+ /**
+ * Assert that a connector is running with at least the given number of tasks all in running state
+ *
+ * @param connectorName the connector name
+ * @param numTasks the number of tasks
+ * @param detailMessage
+ * @throws InterruptedException
+ */
+ public void assertConnectorAndAtLeastNumTasksAreRunning(String connectorName, int numTasks, String detailMessage)
+ throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkConnectorState(
+ connectorName,
+ AbstractStatus.State.RUNNING,
+ numTasks,
+ AbstractStatus.State.RUNNING,
+ (actual, expected) -> actual >= expected
+ ).orElse(false),
+ CONNECTOR_SETUP_DURATION_MS,
+ "The connector or at least " + numTasks + " of tasks are not running.");
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
+ /**
+ * Assert that a connector is running with at least the given number of tasks all in running state
+ *
+ * @param connectorName the connector name
+ * @param numTasks the number of tasks
+ * @param detailMessage the assertion message
+ * @throws InterruptedException
+ */
+ public void assertConnectorAndExactlyNumTasksAreRunning(String connectorName, int numTasks, String detailMessage)
+ throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkConnectorState(
+ connectorName,
+ AbstractStatus.State.RUNNING,
+ numTasks,
+ AbstractStatus.State.RUNNING,
+ (actual, expected) -> actual == expected
+ ).orElse(false),
+ CONNECTOR_SETUP_DURATION_MS,
+ "The connector or exactly " + numTasks + " tasks are not running.");
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
+ /**
+ * Assert that a connector is running, that it has a specific number of tasks, and that all of
+ * its tasks are in the FAILED state.
+ *
+ * @param connectorName the connector name
+ * @param numTasks the number of tasks
+ * @param detailMessage the assertion message
+ * @throws InterruptedException
+ */
+ public void assertConnectorIsRunningAndTasksHaveFailed(String connectorName, int numTasks, String detailMessage)
+ throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkConnectorState(
+ connectorName,
+ AbstractStatus.State.RUNNING,
+ numTasks,
+ AbstractStatus.State.FAILED,
+ (actual, expected) -> actual >= expected
+ ).orElse(false),
+ CONNECTOR_SETUP_DURATION_MS,
+ "Either the connector is not running or not all the " + numTasks + " tasks have failed.");
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
+ /**
+ * Assert that a connector and its tasks are not running.
+ *
+ * @param connectorName the connector name
+ * @param detailMessage the assertion message
+ * @throws InterruptedException
+ */
+ public void assertConnectorAndTasksAreStopped(String connectorName, String detailMessage)
+ throws InterruptedException {
+ try {
+ waitForCondition(
+ () -> checkConnectorAndTasksAreStopped(connectorName),
+ CONNECTOR_SETUP_DURATION_MS,
+ "At least the connector or one of its tasks is still");
+ } catch (AssertionError e) {
+ throw new AssertionError(detailMessage, e);
+ }
+ }
+
+ /**
+ * Check whether the connector or any of its tasks are still in RUNNING state
+ *
+ * @param connectorName the connector
+ * @return true if the connector and all the tasks are not in RUNNING state; false otherwise
+ */
+ protected boolean checkConnectorAndTasksAreStopped(String connectorName) {
+ ConnectorStateInfo info;
+ try {
+ info = connect.connectorStatus(connectorName);
+ } catch (ConnectRestException e) {
+ return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
+ } catch (Exception e) {
+ log.error("Could not check connector state info.", e);
+ return false;
+ }
+ if (info == null) {
+ return true;
+ }
+ return !info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
+ && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+ }
+
+ /**
+ * Check whether the given connector state matches the current state of the connector and
+ * whether it has at least the given number of tasks, with all the tasks matching the given
+ * task state.
+ * @param connectorName the connector
+ * @param connectorState
+ * @param numTasks the expected number of tasks
+ * @param tasksState
+ * @return true if the connector and tasks are in RUNNING state; false otherwise
+ */
+ protected Optional<Boolean> checkConnectorState(
+ String connectorName,
+ AbstractStatus.State connectorState,
+ int numTasks,
+ AbstractStatus.State tasksState,
+ BiFunction<Integer, Integer, Boolean> comp
+ ) {
+ try {
+ ConnectorStateInfo info = connect.connectorStatus(connectorName);
+ boolean result = info != null
+ && comp.apply(info.tasks().size(), numTasks)
+ && info.connector().state().equals(connectorState.toString())
+ && info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString()));
+ return Optional.of(result);
+ } catch (Exception e) {
+ log.error("Could not check connector state info.", e);
+ return Optional.empty();
+ }
+ }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
index d36ae5b..b365e65 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -97,7 +97,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
}
@Override
- protected void before() throws IOException {
+ protected void before() {
start();
}
@@ -106,11 +106,17 @@ public class EmbeddedKafkaCluster extends ExternalResource {
stop();
}
- public void startOnlyKafkaOnSamePorts() throws IOException {
+ /**
+ * Starts the Kafka cluster alone using the ports that were assigned during initialization of
+ * the harness.
+ *
+ * @throws ConnectException if a directory to store the data cannot be created
+ */
+ public void startOnlyKafkaOnSamePorts() {
start(currentBrokerPorts, currentBrokerLogDirs);
}
- private void start() throws IOException {
+ private void start() {
// pick a random port
zookeeper = new EmbeddedZookeeper();
Arrays.fill(currentBrokerPorts, 0);
@@ -118,7 +124,7 @@ public class EmbeddedKafkaCluster extends ExternalResource {
start(currentBrokerPorts, currentBrokerLogDirs);
}
- private void start(int[] brokerPorts, String[] logDirs) throws IOException {
+ private void start(int[] brokerPorts, String[] logDirs) {
brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
@@ -205,10 +211,15 @@ public class EmbeddedKafkaCluster extends ExternalResource {
}
}
- private String createLogDir() throws IOException {
+ private String createLogDir() {
TemporaryFolder tmpFolder = new TemporaryFolder();
- tmpFolder.create();
- return tmpFolder.newFolder().getAbsolutePath();
+ try {
+ tmpFolder.create();
+ return tmpFolder.newFolder().getAbsolutePath();
+ } catch (IOException e) {
+ log.error("Unable to create temporary log directory", e);
+ throw new ConnectException("Unable to create temporary log directory", e);
+ }
}
public String bootstrapServers() {