You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/12 23:47:47 UTC
[kafka] branch 2.5 updated: MINOR: Small Connect integration test
fixes (#8100)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.5
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.5 by this push:
new 5af8380 MINOR: Small Connect integration test fixes (#8100)
5af8380 is described below
commit 5af8380c64df8111b081b36e4c3d3fc6c64dd6ad
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Wed Feb 12 15:40:37 2020 -0800
MINOR: Small Connect integration test fixes (#8100)
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../mirror/MirrorConnectorsIntegrationTest.java | 50 +++++++---------------
.../integration/ConnectWorkerIntegrationTest.java | 3 +-
.../ConnectorClientPolicyIntegrationTest.java | 15 ++++---
.../integration/ErrorHandlingIntegrationTest.java | 11 ++++-
.../integration/ExampleConnectIntegrationTest.java | 3 +-
.../RebalanceSourceConnectorsIntegrationTest.java | 3 +-
.../integration/RestExtensionIntegrationTest.java | 11 +++--
.../SessionedProtocolIntegrationTest.java | 3 +-
.../util/clusters/EmbeddedConnectCluster.java | 10 +++--
9 files changed, 52 insertions(+), 57 deletions(-)
diff --git a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
index 6331a10..8a15ad5 100644
--- a/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
+++ b/connect/mirror/src/test/java/org/apache/kafka/connect/mirror/MirrorConnectorsIntegrationTest.java
@@ -16,17 +16,13 @@
*/
package org.apache.kafka.connect.mirror;
-import org.apache.kafka.connect.runtime.AbstractStatus;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
-import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
-import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.TopicPartition;
-
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.connect.util.clusters.EmbeddedKafkaCluster;
+import org.apache.kafka.test.IntegrationTest;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -34,19 +30,18 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.time.Duration;
import java.util.Arrays;
-import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
-import java.util.Set;
-import java.util.Collections;
import java.util.Properties;
-import java.time.Duration;
+import java.util.Set;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
-import static org.apache.kafka.test.TestUtils.waitForCondition;
/**
* Tests MM2 replication and failover/failback logic.
@@ -71,7 +66,7 @@ public class MirrorConnectorsIntegrationTest {
private EmbeddedConnectCluster backup;
@Before
- public void setup() throws IOException, InterruptedException {
+ public void setup() throws InterruptedException {
Properties brokerProps = new Properties();
brokerProps.put("auto.create.topics.enable", "false");
@@ -116,7 +111,11 @@ public class MirrorConnectorsIntegrationTest {
.build();
primary.start();
+ primary.assertions().assertAtLeastNumWorkersAreUp(3,
+ "Workers of primary-connect-cluster did not start in time.");
backup.start();
+ primary.assertions().assertAtLeastNumWorkersAreUp(3,
+ "Workers of backup-connect-cluster did not start in time.");
// create these topics before starting the connectors so we don't need to wait for discovery
primary.kafka().createTopic("test-topic-1", NUM_PARTITIONS);
@@ -189,30 +188,13 @@ public class MirrorConnectorsIntegrationTest {
private void waitUntilMirrorMakerIsRunning(EmbeddedConnectCluster connectCluster,
Set<String> connNames) throws InterruptedException {
for (String connector : connNames) {
- TestUtils.waitForCondition(() -> areConnectorAndTasksRunning(connectCluster,
- connector), "Timed out trying to verify connector " +
- connector + " was up!");
- }
- }
-
- private boolean areConnectorAndTasksRunning(EmbeddedConnectCluster connectCluster,
- String connectorName) {
- try {
- ConnectorStateInfo info = connectCluster.connectorStatus(connectorName);
- boolean result = info != null
- && !info.tasks().isEmpty()
- && info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
- && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
- log.debug("Found connector and tasks running: {}", result);
- return result;
- } catch (Exception e) {
- log.error("Could not check connector state info.", e);
- return false;
+ connectCluster.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connector, 1,
+ "Connector " + connector + " tasks did not start in time on cluster: " + connectCluster);
}
}
@After
- public void close() throws IOException {
+ public void close() {
for (String x : primary.connectors()) {
primary.deleteConnector(x);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 6cc43a4..b8e0497 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -66,7 +65,7 @@ public class ConnectWorkerIntegrationTest {
Properties brokerProps = new Properties();
@Before
- public void setup() throws IOException {
+ public void setup() {
// setup Connect worker properties
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
index 699ea23..db324fa 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorClientPolicyIntegrationTest.java
@@ -29,7 +29,6 @@ import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -50,7 +49,6 @@ public class ConnectorClientPolicyIntegrationTest {
private static final int NUM_WORKERS = 1;
private static final String CONNECTOR_NAME = "simple-conn";
-
@After
public void close() {
}
@@ -73,7 +71,7 @@ public class ConnectorClientPolicyIntegrationTest {
@Test
public void testCreateWithAllowedOverridesForPrincipalPolicy() throws Exception {
Map<String, String> props = basicConnectorConfig();
- props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAIN");
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
assertPassCreateConnector("Principal", props);
}
@@ -85,7 +83,7 @@ public class ConnectorClientPolicyIntegrationTest {
assertPassCreateConnector("All", props);
}
- private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws IOException {
+ private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
@@ -106,10 +104,13 @@ public class ConnectorClientPolicyIntegrationTest {
// start the clusters
connect.start();
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
return connect;
}
- private void assertFailCreateConnector(String policy, Map<String, String> props) throws IOException {
+ private void assertFailCreateConnector(String policy, Map<String, String> props) throws InterruptedException {
EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
try {
connect.configureConnector(CONNECTOR_NAME, props);
@@ -121,10 +122,12 @@ public class ConnectorClientPolicyIntegrationTest {
}
}
- private void assertPassCreateConnector(String policy, Map<String, String> props) throws IOException {
+ private void assertPassCreateConnector(String policy, Map<String, String> props) throws InterruptedException {
EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
try {
connect.configureConnector(CONNECTOR_NAME, props);
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
} catch (ConnectRestException e) {
fail("Should be able to create connector");
} finally {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 33e6cf5..8963b8c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -69,6 +68,7 @@ public class ErrorHandlingIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
+ private static final int NUM_WORKERS = 1;
private static final String DLQ_TOPIC = "my-connector-errors";
private static final String CONNECTOR_NAME = "error-conn";
private static final String TASK_ID = "error-conn-0";
@@ -83,12 +83,14 @@ public class ErrorHandlingIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
- public void setup() throws IOException {
+ public void setup() throws InterruptedException {
// setup Connect cluster with defaults
connect = new EmbeddedConnectCluster.Builder().build();
// start Connect cluster
connect.start();
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
// get connector handles before starting test.
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
@@ -134,6 +136,8 @@ public class ErrorHandlingIntegrationTest {
connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
connect.configureConnector(CONNECTOR_NAME, props);
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
waitForCondition(this::checkForPartitionAssignment,
CONNECTOR_SETUP_DURATION_MS,
@@ -172,6 +176,9 @@ public class ErrorHandlingIntegrationTest {
}
connect.deleteConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+ "Connector tasks did not stop in time.");
+
}
/**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index 4da89d7..f4cba87 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -66,7 +65,7 @@ public class ExampleConnectIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
- public void setup() throws IOException {
+ public void setup() {
// setup Connect worker properties
Map<String, String> exampleWorkerProps = new HashMap<>();
exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 073b307..19e7863 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -70,7 +69,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
private EmbeddedConnectCluster connect;
@Before
- public void setup() throws IOException {
+ public void setup() {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index d5328a6..6ec86bd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -57,11 +56,12 @@ public class RestExtensionIntegrationTest {
private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
+ private static final int NUM_WORKERS = 1;
private EmbeddedConnectCluster connect;
@Test
- public void testRestExtensionApi() throws IOException, InterruptedException {
+ public void testRestExtensionApi() throws InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
@@ -69,7 +69,7 @@ public class RestExtensionIntegrationTest {
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
- .numWorkers(1)
+ .numWorkers(NUM_WORKERS)
.numBrokers(1)
.workerProps(workerProps)
.build();
@@ -77,6 +77,9 @@ public class RestExtensionIntegrationTest {
// start the clusters
connect.start();
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
WorkerHandle worker = connect.workers().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("At least one worker handle should be available"));
@@ -99,6 +102,8 @@ public class RestExtensionIntegrationTest {
connectorHandle.taskHandle(connectorHandle.name() + "-0");
StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1);
connect.configureConnector(connectorHandle.name(), connectorProps);
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(), 1,
+ "Connector tasks did not start in time.");
connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
index 1f13c17..8956a86 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/SessionedProtocolIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -61,7 +60,7 @@ public class SessionedProtocolIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
- public void setup() throws IOException {
+ public void setup() {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(CONNECT_PROTOCOL_CONFIG, ConnectProtocolCompatibility.SESSIONED.protocol());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 2276340..961b1d8 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -284,7 +284,8 @@ public class EmbeddedConnectCluster {
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
return responseToString(response);
}
- throw new ConnectRestException(response.getStatus(), "Could not execute PUT request");
+ throw new ConnectRestException(response.getStatus(),
+ "Could not execute PUT request. Error response: " + responseToString(response));
}
/**
@@ -298,7 +299,8 @@ public class EmbeddedConnectCluster {
String url = endpointForResource(String.format("connectors/%s", connName));
Response response = requestDelete(url);
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(), "Could not execute DELETE request.");
+ throw new ConnectRestException(response.getStatus(),
+ "Could not execute DELETE request. Error response: " + responseToString(response));
}
}
@@ -358,7 +360,7 @@ public class EmbeddedConnectCluster {
*
* @param resource the resource under the worker's admin endpoint
* @return the admin endpoint URL
- * @throws ConnectRestException if no admin REST endpoint is available
+ * @throws ConnectException if no admin REST endpoint is available
*/
public String adminEndpoint(String resource) {
String url = connectCluster.stream()
@@ -375,7 +377,7 @@ public class EmbeddedConnectCluster {
*
* @param resource the resource under the worker's admin endpoint
* @return the admin endpoint URL
- * @throws ConnectRestException if no REST endpoint is available
+ * @throws ConnectException if no REST endpoint is available
*/
public String endpointForResource(String resource) {
String url = connectCluster.stream()