You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2020/02/13 00:02:29 UTC
[kafka] branch 2.3 updated: MINOR: Small Connect integration test
fixes (#8100)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new fc543ad MINOR: Small Connect integration test fixes (#8100)
fc543ad is described below
commit fc543add8fb40feae45ccd78168ec54667fdbae5
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Wed Feb 12 15:40:37 2020 -0800
MINOR: Small Connect integration test fixes (#8100)
Author: Konstantine Karantasis <ko...@confluent.io>
Reviewer: Randall Hauch <rh...@gmail.com>
---
.../connect/integration/ConnectWorkerIntegrationTest.java | 3 +--
.../integration/ConnectorCientPolicyIntegrationTest.java | 15 +++++++++------
.../connect/integration/ErrorHandlingIntegrationTest.java | 11 +++++++++--
.../integration/ExampleConnectIntegrationTest.java | 3 +--
.../RebalanceSourceConnectorsIntegrationTest.java | 3 +--
.../connect/integration/RestExtensionIntegrationTest.java | 11 ++++++++---
.../connect/util/clusters/EmbeddedConnectCluster.java | 8 +++++---
7 files changed, 34 insertions(+), 20 deletions(-)
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 299cb93..0aa4418 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -65,7 +64,7 @@ public class ConnectWorkerIntegrationTest {
Properties brokerProps = new Properties();
@Before
- public void setup() throws IOException {
+ public void setup() {
// setup Connect worker properties
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
index 499916b..00f541b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorCientPolicyIntegrationTest.java
@@ -29,7 +29,6 @@ import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -50,7 +49,6 @@ public class ConnectorCientPolicyIntegrationTest {
private static final int NUM_WORKERS = 1;
private static final String CONNECTOR_NAME = "simple-conn";
-
@After
public void close() {
}
@@ -73,7 +71,7 @@ public class ConnectorCientPolicyIntegrationTest {
@Test
public void testCreateWithAllowedOverridesForPrincipalPolicy() throws Exception {
Map<String, String> props = basicConnectorConfig();
- props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAIN");
+ props.put(ConnectorConfig.CONNECTOR_CLIENT_CONSUMER_OVERRIDES_PREFIX + CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "PLAINTEXT");
assertPassCreateConnector("Principal", props);
}
@@ -85,7 +83,7 @@ public class ConnectorCientPolicyIntegrationTest {
assertPassCreateConnector("All", props);
}
- private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws IOException {
+ private EmbeddedConnectCluster connectClusterWithPolicy(String policy) throws InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
@@ -106,10 +104,13 @@ public class ConnectorCientPolicyIntegrationTest {
// start the clusters
connect.start();
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
return connect;
}
- private void assertFailCreateConnector(String policy, Map<String, String> props) throws IOException {
+ private void assertFailCreateConnector(String policy, Map<String, String> props) throws InterruptedException {
EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
try {
connect.configureConnector(CONNECTOR_NAME, props);
@@ -121,10 +122,12 @@ public class ConnectorCientPolicyIntegrationTest {
}
}
- private void assertPassCreateConnector(String policy, Map<String, String> props) throws IOException {
+ private void assertPassCreateConnector(String policy, Map<String, String> props) throws InterruptedException {
EmbeddedConnectCluster connect = connectClusterWithPolicy(policy);
try {
connect.configureConnector(CONNECTOR_NAME, props);
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
} catch (ConnectRestException e) {
fail("Should be able to create connector");
} finally {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 33e6cf5..8963b8c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -69,6 +68,7 @@ public class ErrorHandlingIntegrationTest {
private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
+ private static final int NUM_WORKERS = 1;
private static final String DLQ_TOPIC = "my-connector-errors";
private static final String CONNECTOR_NAME = "error-conn";
private static final String TASK_ID = "error-conn-0";
@@ -83,12 +83,14 @@ public class ErrorHandlingIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
- public void setup() throws IOException {
+ public void setup() throws InterruptedException {
// setup Connect cluster with defaults
connect = new EmbeddedConnectCluster.Builder().build();
// start Connect cluster
connect.start();
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
// get connector handles before starting test.
connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
@@ -134,6 +136,8 @@ public class ErrorHandlingIntegrationTest {
connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
connect.configureConnector(CONNECTOR_NAME, props);
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, NUM_TASKS,
+ "Connector tasks did not start in time.");
waitForCondition(this::checkForPartitionAssignment,
CONNECTOR_SETUP_DURATION_MS,
@@ -172,6 +176,9 @@ public class ErrorHandlingIntegrationTest {
}
connect.deleteConnector(CONNECTOR_NAME);
+ connect.assertions().assertConnectorAndTasksAreStopped(CONNECTOR_NAME,
+ "Connector tasks did not stop in time.");
+
}
/**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
index bd06291..14808c7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -27,7 +27,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
@@ -66,7 +65,7 @@ public class ExampleConnectIntegrationTest {
private ConnectorHandle connectorHandle;
@Before
- public void setup() throws IOException {
+ public void setup() {
// setup Connect worker properties
Map<String, String> exampleWorkerProps = new HashMap<>();
exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(5_000));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 073b307..19e7863 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -28,7 +28,6 @@ import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -70,7 +69,7 @@ public class RebalanceSourceConnectorsIntegrationTest {
private EmbeddedConnectCluster connect;
@Before
- public void setup() throws IOException {
+ public void setup() {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(CONNECT_PROTOCOL_CONFIG, COMPATIBLE.toString());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
index d5328a6..6ec86bd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RestExtensionIntegrationTest.java
@@ -34,7 +34,6 @@ import org.junit.experimental.categories.Category;
import javax.ws.rs.GET;
import javax.ws.rs.Path;
import javax.ws.rs.core.Response;
-import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -57,11 +56,12 @@ public class RestExtensionIntegrationTest {
private static final long REST_EXTENSION_REGISTRATION_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
private static final long CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS = TimeUnit.MINUTES.toMillis(1);
+ private static final int NUM_WORKERS = 1;
private EmbeddedConnectCluster connect;
@Test
- public void testRestExtensionApi() throws IOException, InterruptedException {
+ public void testRestExtensionApi() throws InterruptedException {
// setup Connect worker properties
Map<String, String> workerProps = new HashMap<>();
workerProps.put(REST_EXTENSION_CLASSES_CONFIG, IntegrationTestRestExtension.class.getName());
@@ -69,7 +69,7 @@ public class RestExtensionIntegrationTest {
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
- .numWorkers(1)
+ .numWorkers(NUM_WORKERS)
.numBrokers(1)
.workerProps(workerProps)
.build();
@@ -77,6 +77,9 @@ public class RestExtensionIntegrationTest {
// start the clusters
connect.start();
+ connect.assertions().assertAtLeastNumWorkersAreUp(NUM_WORKERS,
+ "Initial group of workers did not start in time.");
+
WorkerHandle worker = connect.workers().stream()
.findFirst()
.orElseThrow(() -> new AssertionError("At least one worker handle should be available"));
@@ -99,6 +102,8 @@ public class RestExtensionIntegrationTest {
connectorHandle.taskHandle(connectorHandle.name() + "-0");
StartAndStopLatch connectorStartLatch = connectorHandle.expectedStarts(1);
connect.configureConnector(connectorHandle.name(), connectorProps);
+ connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(connectorHandle.name(), 1,
+ "Connector tasks did not start in time.");
connectorStartLatch.await(CONNECTOR_HEALTH_AND_CONFIG_TIMEOUT_MS, TimeUnit.MILLISECONDS);
String workerId = String.format("%s:%d", worker.url().getHost(), worker.url().getPort());
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
index 8e2736d..89c97e6 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -284,7 +284,8 @@ public class EmbeddedConnectCluster {
if (response.getStatus() < Response.Status.BAD_REQUEST.getStatusCode()) {
return responseToString(response);
}
- throw new ConnectRestException(response.getStatus(), "Could not execute PUT request");
+ throw new ConnectRestException(response.getStatus(),
+ "Could not execute PUT request. Error response: " + responseToString(response));
}
/**
@@ -298,7 +299,8 @@ public class EmbeddedConnectCluster {
String url = endpointForResource(String.format("connectors/%s", connName));
Response response = requestDelete(url);
if (response.getStatus() >= Response.Status.BAD_REQUEST.getStatusCode()) {
- throw new ConnectRestException(response.getStatus(), "Could not execute DELETE request.");
+ throw new ConnectRestException(response.getStatus(),
+ "Could not execute DELETE request. Error response: " + responseToString(response));
}
}
@@ -358,7 +360,7 @@ public class EmbeddedConnectCluster {
*
* @param resource the resource under the worker's admin endpoint
* @return the admin endpoint URL
- * @throws ConnectRestException if no REST endpoint is available
+ * @throws ConnectException if no REST endpoint is available
*/
public String endpointForResource(String resource) {
String url = connectCluster.stream()