You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2024/02/01 14:20:31 UTC
(kafka) branch trunk updated: KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249)
This is an automated email from the ASF dual-hosted git repository.
cegerton pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7bc4afee113 KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249)
7bc4afee113 is described below
commit 7bc4afee11353eccc5d31491693d1dc6e0bba6f7
Author: Chris Egerton <ch...@aiven.io>
AuthorDate: Thu Feb 1 09:20:24 2024 -0500
KAFKA-15675: Improve worker liveness check during Connect integration tests (#15249)
Reviewers: Greg Harris <gr...@aiven.io>, Yash Mayya <ya...@gmail.com>
---
.../connect/integration/BlockingConnectorTest.java | 5 -----
.../integration/ConnectWorkerIntegrationTest.java | 3 ---
.../integration/ExactlyOnceSourceIntegrationTest.java | 2 +-
.../kafka/connect/util/clusters/EmbeddedConnect.java | 18 ++++++++++++------
4 files changed, 13 insertions(+), 15 deletions(-)
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
index 2e415d1e31e..8465a1e31e3 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
@@ -138,11 +138,6 @@ public class BlockingConnectorTest {
NUM_WORKERS,
"Initial group of workers did not start in time"
);
-
- try (Response response = connect.requestGet(connect.endpointForResource("connectors/nonexistent"))) {
- // hack: make sure the worker is actually up (has joined the cluster, created and read to the end of internal topics, etc.)
- assertEquals(404, response.getStatus());
- }
}
@After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index ec4d256c6e6..9dca7425d66 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -238,9 +238,6 @@ public class ConnectWorkerIntegrationTest {
connect.kafka().stopOnlyKafka();
- connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
- "Group of workers did not remain the same after broker shutdown");
-
// Allow for the workers to discover that the coordinator is unavailable, wait is
// heartbeat timeout * 2 + 4sec
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
index b793cbf0209..09ea3f5ae4b 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExactlyOnceSourceIntegrationTest.java
@@ -500,7 +500,7 @@ public class ExactlyOnceSourceIntegrationTest {
connectorHandle.expectedCommits(MINIMUM_MESSAGES);
// make sure the worker is actually up (otherwise, it may fence out our simulated zombie leader, instead of the other way around)
- assertEquals(404, connect.requestGet(connect.endpointForResource("connectors/nonexistent")).getStatus());
+ connect.assertions().assertExactlyNumWorkersAreUp(1, "Connect worker did not complete startup in time");
// fence out the leader of the cluster
Producer<?, ?> zombieLeader = transactionalProducer(
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
index 147e435adf6..af6c60e847e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnect.java
@@ -32,7 +32,6 @@ import org.apache.kafka.connect.runtime.rest.entities.ConnectorOffsets;
import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
import org.apache.kafka.connect.runtime.rest.entities.LoggerLevel;
-import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
import org.apache.kafka.connect.util.SinkUtils;
@@ -959,15 +958,22 @@ abstract class EmbeddedConnect {
* @return the list of handles of the online workers
*/
public Set<WorkerHandle> activeWorkers() {
- ObjectMapper mapper = new ObjectMapper();
return workers().stream()
.filter(w -> {
try {
- mapper.readerFor(ServerInfo.class)
- .readValue(responseToString(requestGet(w.url().toString())));
- return true;
- } catch (ConnectException | IOException e) {
+ String endpoint = w.url().resolve("/connectors/liveness-check").toString();
+ Response response = requestGet(endpoint);
+ boolean live = response.getStatus() == Response.Status.NOT_FOUND.getStatusCode()
+ || response.getStatus() == Response.Status.OK.getStatusCode();
+ if (live) {
+ return true;
+ } else {
+ log.warn("Worker failed liveness probe. Response: {}", response);
+ return false;
+ }
+ } catch (Exception e) {
// Worker failed to respond. Consider it's offline
+ log.warn("Failed to contact worker during liveness check", e);
return false;
}
})