You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/09/19 15:39:47 UTC
[kafka] branch trunk updated: KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7872a1ff5b2 KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371)
7872a1ff5b2 is described below
commit 7872a1ff5b2e9a0fbbe3d71180a97e29f1549d4f
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Tue Sep 19 16:39:39 2023 +0100
KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371)
Reviewers: Sagar Rao <sa...@gmail.com>, Chris Egerton <ch...@aiven.io>
---
.../integration/ConnectWorkerIntegrationTest.java | 8 +++----
.../ConnectorTopicsIntegrationTest.java | 8 +++----
.../integration/ErrorHandlingIntegrationTest.java | 8 +++----
.../RebalanceSourceConnectorsIntegrationTest.java | 4 ++--
.../clusters/EmbeddedConnectClusterAssertions.java | 26 +++++++++-------------
5 files changed, 24 insertions(+), 30 deletions(-)
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 2e843cd6ec6..4c393d95ad3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -431,9 +431,9 @@ public class ConnectWorkerIntegrationTest {
// Delete the connector
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreNotRunning(
+ connect.assertions().assertConnectorDoesNotExist(
CONNECTOR_NAME,
- "Connector tasks were not destroyed in time"
+ "Connector wasn't deleted in time"
);
}
@@ -505,9 +505,9 @@ public class ConnectWorkerIntegrationTest {
// Can delete a stopped connector
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreNotRunning(
+ connect.assertions().assertConnectorDoesNotExist(
CONNECTOR_NAME,
- "Connector and all of its tasks should no longer be running"
+ "Connector wasn't deleted in time"
);
}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index a8b812f8c31..0614ba8a9f7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -150,8 +150,8 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(BAR_CONNECTOR);
- connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(BAR_CONNECTOR,
+ "Connector wasn't deleted in time.");
connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for deleted connector: " + BAR_CONNECTOR);
@@ -205,8 +205,8 @@ public class ConnectorTopicsIntegrationTest {
// deleting a connector resets its active topics
connect.deleteConnector(FOO_CONNECTOR);
- connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(FOO_CONNECTOR,
+ "Connector wasn't deleted in time.");
connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
"Active topic set is not empty for deleted connector: " + FOO_CONNECTOR);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 7d3c1d6924b..55479e6d4ff 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -179,8 +179,8 @@ public class ErrorHandlingIntegrationTest {
}
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+ "Connector wasn't deleted in time.");
}
@@ -248,8 +248,8 @@ public class ErrorHandlingIntegrationTest {
ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
connect.deleteConnector(CONNECTOR_NAME);
- connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+ "Connector wasn't deleted in time.");
}
/**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 04e12ea41e0..82004c8dc3a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -206,8 +206,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
// delete connector
connect.deleteConnector(CONNECTOR_NAME + 3);
- connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3,
- "Connector tasks did not stop in time.");
+ connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME + 3,
+ "Connector wasn't deleted in time.");
waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index c4ff5018ed1..d8e488f4727 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -455,48 +455,42 @@ public class EmbeddedConnectClusterAssertions {
}
/**
- * Assert that a connector and its tasks are not running.
+ * Assert that a connector does not exist. This can be used to verify that a connector has been successfully deleted.
*
* @param connectorName the connector name
* @param detailMessage the assertion message
* @throws InterruptedException
*/
- public void assertConnectorAndTasksAreNotRunning(String connectorName, String detailMessage)
+ public void assertConnectorDoesNotExist(String connectorName, String detailMessage)
throws InterruptedException {
try {
waitForCondition(
- () -> checkConnectorAndTasksAreNotRunning(connectorName),
+ () -> checkConnectorDoesNotExist(connectorName),
CONNECTOR_SETUP_DURATION_MS,
- "At least the connector or one of its tasks is still running");
+ "The connector should not exist.");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}
}
/**
- * Check whether the connector or any of its tasks are still in RUNNING state
+ * Check whether a connector exists by querying the <strong><em>GET /connectors/{connector}/status</em></strong> endpoint
*
- * @param connectorName the connector
- * @return true if the connector and all the tasks are not in RUNNING state; false otherwise
+ * @param connectorName the connector name
+ * @return true if the connector does not exist; false otherwise
*/
- protected boolean checkConnectorAndTasksAreNotRunning(String connectorName) {
- ConnectorStateInfo info;
+ protected boolean checkConnectorDoesNotExist(String connectorName) {
try {
- info = connect.connectorStatus(connectorName);
+ connect.connectorStatus(connectorName);
} catch (ConnectRestException e) {
return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
} catch (Exception e) {
log.error("Could not check connector state info.", e);
return false;
}
- if (info == null) {
- return true;
- }
- return !info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
- && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+ return false;
}
-
/**
* Assert that a connector is in the stopped state and has no tasks.
*