You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2023/09/19 15:39:47 UTC

[kafka] branch trunk updated: KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7872a1ff5b2 KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371)
7872a1ff5b2 is described below

commit 7872a1ff5b2e9a0fbbe3d71180a97e29f1549d4f
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Tue Sep 19 16:39:39 2023 +0100

    KAFKA-14855: Harden integration testing logic for asserting that a connector is deleted (#14371)
    
    Reviewers: Sagar Rao <sa...@gmail.com>, Chris Egerton <ch...@aiven.io>
---
 .../integration/ConnectWorkerIntegrationTest.java  |  8 +++----
 .../ConnectorTopicsIntegrationTest.java            |  8 +++----
 .../integration/ErrorHandlingIntegrationTest.java  |  8 +++----
 .../RebalanceSourceConnectorsIntegrationTest.java  |  4 ++--
 .../clusters/EmbeddedConnectClusterAssertions.java | 26 +++++++++-------------
 5 files changed, 24 insertions(+), 30 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 2e843cd6ec6..4c393d95ad3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -431,9 +431,9 @@ public class ConnectWorkerIntegrationTest {
 
         // Delete the connector
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreNotRunning(
+        connect.assertions().assertConnectorDoesNotExist(
                 CONNECTOR_NAME,
-                "Connector tasks were not destroyed in time"
+                "Connector wasn't deleted in time"
         );
     }
 
@@ -505,9 +505,9 @@ public class ConnectWorkerIntegrationTest {
 
         // Can delete a stopped connector
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreNotRunning(
+        connect.assertions().assertConnectorDoesNotExist(
                 CONNECTOR_NAME,
-                "Connector and all of its tasks should no longer be running"
+                "Connector wasn't deleted in time"
         );
     }
 
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
index a8b812f8c31..0614ba8a9f7 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorTopicsIntegrationTest.java
@@ -150,8 +150,8 @@ public class ConnectorTopicsIntegrationTest {
         // deleting a connector resets its active topics
         connect.deleteConnector(BAR_CONNECTOR);
 
-        connect.assertions().assertConnectorAndTasksAreNotRunning(BAR_CONNECTOR,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(BAR_CONNECTOR,
+                "Connector wasn't deleted in time.");
 
         connect.assertions().assertConnectorActiveTopics(BAR_CONNECTOR, Collections.emptyList(),
                 "Active topic set is not empty for deleted connector: " + BAR_CONNECTOR);
@@ -205,8 +205,8 @@ public class ConnectorTopicsIntegrationTest {
         // deleting a connector resets its active topics
         connect.deleteConnector(FOO_CONNECTOR);
 
-        connect.assertions().assertConnectorAndTasksAreNotRunning(FOO_CONNECTOR,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(FOO_CONNECTOR,
+                "Connector wasn't deleted in time.");
 
         connect.assertions().assertConnectorActiveTopics(FOO_CONNECTOR, Collections.emptyList(),
                 "Active topic set is not empty for deleted connector: " + FOO_CONNECTOR);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
index 7d3c1d6924b..55479e6d4ff 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -179,8 +179,8 @@ public class ErrorHandlingIntegrationTest {
         }
 
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+                "Connector wasn't deleted in time.");
 
     }
 
@@ -248,8 +248,8 @@ public class ErrorHandlingIntegrationTest {
         ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
 
         connect.deleteConnector(CONNECTOR_NAME);
-        connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME,
-            "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME,
+            "Connector wasn't deleted in time.");
     }
 
     /**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
index 04e12ea41e0..82004c8dc3a 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RebalanceSourceConnectorsIntegrationTest.java
@@ -206,8 +206,8 @@ public class RebalanceSourceConnectorsIntegrationTest {
         // delete connector
         connect.deleteConnector(CONNECTOR_NAME + 3);
 
-        connect.assertions().assertConnectorAndTasksAreNotRunning(CONNECTOR_NAME + 3,
-                "Connector tasks did not stop in time.");
+        connect.assertions().assertConnectorDoesNotExist(CONNECTOR_NAME + 3,
+                "Connector wasn't deleted in time.");
 
         waitForCondition(this::assertConnectorAndTasksAreUniqueAndBalanced,
                 WORKER_SETUP_DURATION_MS, "Connect and tasks are imbalanced between the workers.");
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index c4ff5018ed1..d8e488f4727 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -455,48 +455,42 @@ public class EmbeddedConnectClusterAssertions {
     }
 
     /**
-     * Assert that a connector and its tasks are not running.
+     * Assert that a connector does not exist. This can be used to verify that a connector has been successfully deleted.
      *
      * @param connectorName the connector name
      * @param detailMessage the assertion message
      * @throws InterruptedException
      */
-    public void assertConnectorAndTasksAreNotRunning(String connectorName, String detailMessage)
+    public void assertConnectorDoesNotExist(String connectorName, String detailMessage)
             throws InterruptedException {
         try {
             waitForCondition(
-                () -> checkConnectorAndTasksAreNotRunning(connectorName),
+                () -> checkConnectorDoesNotExist(connectorName),
                 CONNECTOR_SETUP_DURATION_MS,
-                "At least the connector or one of its tasks is still running");
+                "The connector should not exist.");
         } catch (AssertionError e) {
             throw new AssertionError(detailMessage, e);
         }
     }
 
     /**
-     * Check whether the connector or any of its tasks are still in RUNNING state
+     * Check whether a connector exists by querying the <strong><em>GET /connectors/{connector}/status</em></strong> endpoint
      *
-     * @param connectorName the connector
-     * @return true if the connector and all the tasks are not in RUNNING state; false otherwise
+     * @param connectorName the connector name
+     * @return true if the connector does not exist; false otherwise
      */
-    protected boolean checkConnectorAndTasksAreNotRunning(String connectorName) {
-        ConnectorStateInfo info;
+    protected boolean checkConnectorDoesNotExist(String connectorName) {
         try {
-            info = connect.connectorStatus(connectorName);
+            connect.connectorStatus(connectorName);
         } catch (ConnectRestException e) {
             return e.statusCode() == Response.Status.NOT_FOUND.getStatusCode();
         } catch (Exception e) {
             log.error("Could not check connector state info.", e);
             return false;
         }
-        if (info == null) {
-            return true;
-        }
-        return !info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
-                && info.tasks().stream().noneMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+        return false;
     }
 
-
     /**
      * Assert that a connector is in the stopped state and has no tasks.
      *