You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2024/02/01 14:20:31 UTC

(kafka) branch trunk updated: KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249)

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7bc4afee113 KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249)
7bc4afee113 is described below

commit 7bc4afee11353eccc5d31491693d1dc6e0bba6f7
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Thu Feb 1 09:20:24 2024 -0500

    KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249)
    
    Reviewers: Greg Harris <gr...@aiven.io>, Yash Mayya <ya...@gmail.com>
---
 .../connect/integration/BlockingConnectorTest.java     |  5 -----
 .../integration/ConnectWorkerIntegrationTest.java      |  3 ---
 .../integration/ExactlyOnceSourceIntegrationTest.java  |  2 +-
 .../kafka/connect/util/clusters/EmbeddedConnect.java   | 18 ++++++++++++------
 4 files changed, 13 insertions(+), 15 deletions(-)

diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 2e415d1e31e..8465a1e31e3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -138,11 +138,6 @@ public class BlockingConnectorTest {
                 NUM_WORKERS,
                 "Initial group of workers did not start in time"
         );
-
-        try (Response response = connect.requestGet(connect.endpointForResource("connectors/nonexistent"))) {
-            // hack: make sure the worker is actually up (has joined the cluster, created and read to the end of internal topics, etc.)
-            assertEquals(404, response.getStatus());
-        }
     }
 
     @After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index ec4d256c6e6..9dca7425d66 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -238,9 +238,6 @@ public class ConnectWorkerIntegrationTest {
 
         connect.kafka().stopOnlyKafka();
 
-        connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
-                "Group of workers did not remain the same after broker shutdown");
-
         // Allow for the workers to discover that the coordinator is unavailable, wait is
         // heartbeat timeout * 2 + 4sec
         Thread.sleep(TimeUnit.SECONDS.toMillis(10));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index b793cbf0209..09ea3f5ae4b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -500,7 +500,7 @@ public class ExactlyOnceSourceIntegrationTest {
         connectorHandle.expectedCommits(MINIMUM_MESSAGES);
 
         // make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around)
-        assertEquals(404, connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus());
+        connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker did not complete startup in time");
 
         // fence out the leader of the cluster
         Producer<?, ?> zombieLeader = transactionalProducer(
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 147e435adf6..af6c60e847e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -32,7 +32,6 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
 import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
-import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
 import org.apache.kafka.connect.util.SinkUtils;
@@ -959,15 +958,22 @@ abstract class EmbeddedConnect {
      * @return the list of handles of the online workers
      */
     public Set<WorkerHandle> activeWorkers() {
-        ObjectMapper mapper = new ObjectMapper();
         return workers().stream()
                 .filter(w -> {
                     try {
-                        mapper.readerFor(ServerInfo.class)
-                                .readValue(responseToString(requestGet(w.url().toString())));
-                        return true;
-                    } catch (ConnectException | IOException e) {
+                        String endpoint = w.url().resolve("/connectors/liveness-check").toString();
+                        Response response = requestGet(endpoint);
+                        boolean live = response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
+                                || response.getStatus() == Response.Status.OK.getStatusCode();
+                        if (live) {
+                            return true;
+                        } else {
+                            log.warn("Worker failed liveness probe. Response: {}", response);
+                            return false;
+                        }
+                    } catch (Exception e) {
                         // Worker failed to respond. Consider it's offline
+                        log.warn("Failed to contact worker during liveness check", e);
                         return false;
                     }
                 })