You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/12/04 03:56:14 UTC

[kafka] branch 2.4 updated: KAFKA-9258 Check Connect Metrics non-null in task stop (#7768)

This is an automated email from the ASF dual-hosted git repository.

rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.4 by this push:
     new 6a2cb6c  KAFKA-9258 Check Connect Metrics non-null in task stop (#7768)
6a2cb6c is described below

commit 6a2cb6c3824c34d00ac3edf076643e798af3d617
Author: Cyrus Vafadari <cy...@alum.mit.edu>
AuthorDate: Tue Dec 3 19:13:52 2019 -0800

    KAFKA-9258 Check Connect Metrics non-null in task stop (#7768)
    
    Remove nullcheck, and add integration tests for restarting a failed task.
    
    Authors: Cyrus Vafadari <cy...@confluent.io>, Chris Egerton <ch...@confluent.io>
    Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
 .../org/apache/kafka/connect/runtime/Worker.java   |  2 +-
 .../integration/ConnectWorkerIntegrationTest.java  | 86 +++++++++++++++++++---
 2 files changed, 78 insertions(+), 10 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index ff5eee3..e496d41 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -726,12 +726,12 @@ public class Worker {
     private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
         try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
             WorkerTask task = tasks.remove(taskId);
-            connectorStatusMetricsGroup.recordTaskRemoved(taskId);
             if (task == null) {
                 log.warn("Ignoring await stop request for non-present task {}", taskId);
                 return;
             }
 
+            connectorStatusMetricsGroup.recordTaskRemoved(taskId);
             if (!task.awaitStop(timeout)) {
                 log.error("Graceful stop of task {} failed.", task.id());
                 task.cancel();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index ca63a07..7cdfa7d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -30,17 +30,22 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
 import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
 import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
 import static org.apache.kafka.test.TestUtils.waitForCondition;
 import static org.junit.Assert.assertFalse;
@@ -65,20 +70,20 @@ public class ConnectWorkerIntegrationTest {
     @Before
     public void setup() throws IOException {
         // setup Connect worker properties
-        Map<String, String> exampleWorkerProps = new HashMap<>();
-        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+        Map<String, String> workerProps = new HashMap<>();
+        workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+        workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
 
         // setup Kafka broker properties
-        Properties exampleBrokerProps = new Properties();
-        exampleBrokerProps.put("auto.create.topics.enable", String.valueOf(false));
+        Properties brokerProps = new Properties();
+        brokerProps.put("auto.create.topics.enable", String.valueOf(false));
 
         // build a Connect cluster backed by Kafka and Zk
         connect = new EmbeddedConnectCluster.Builder()
                 .name("connect-cluster")
                 .numWorkers(NUM_WORKERS)
-                .workerProps(exampleWorkerProps)
-                .brokerProps(exampleBrokerProps)
-                .maskExitProcedures(true) // true is the default, setting here as example
+                .workerProps(workerProps)
+                .brokerProps(brokerProps)
                 .build();
 
         // start the clusters
@@ -140,6 +145,42 @@ public class ConnectWorkerIntegrationTest {
     }
 
     /**
+     * Verify that a failed task can be restarted successfully.
+     */
+    @Test
+    public void testRestartFailedTask() throws Exception {
+        int numTasks = 1;
+
+        // Properties for the source connector. The task should fail at startup due to the bad broker address.
+        Map<String, String> connectorProps = new HashMap<>();
+        connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+        connectorProps.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
+        connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress");
+
+        waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
+                WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+
+        // Try to start the connector and its single task.
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        waitForCondition(() -> assertConnectorTasksFailed(CONNECTOR_NAME, numTasks).orElse(false),
+                CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not fail in time");
+
+        // Reconfigure the connector without the bad broker address.
+        connectorProps.remove(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG);
+        connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+        // Restart the failed task
+        String taskRestartEndpoint = connect.endpointForResource(
+            String.format("connectors/%s/tasks/0/restart", CONNECTOR_NAME));
+        connect.executePost(taskRestartEndpoint, "", Collections.emptyMap());
+
+        // Ensure the task started successfully this time
+        waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
+            CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
+    }
+
+    /**
      * Confirm that the requested number of workers is up and running.
      *
      * @param numWorkers the number of online workers
@@ -163,12 +204,39 @@ public class ConnectWorkerIntegrationTest {
      * @return true if the connector and tasks are in RUNNING state; false otherwise
      */
     private Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
+        return assertConnectorState(
+            connectorName,
+            AbstractStatus.State.RUNNING,
+            numTasks,
+            AbstractStatus.State.RUNNING);
+    }
+
+    /**
+     * Confirm that a connector is running, that it has a specific number of tasks, and that all of
+     * its tasks are in the FAILED state.
+     * @param connectorName the connector
+     * @param numTasks the expected number of tasks
+     * @return true if the connector is in RUNNING state and its tasks are in FAILED state; false otherwise
+     */
+    private Optional<Boolean> assertConnectorTasksFailed(String connectorName, int numTasks) {
+        return assertConnectorState(
+            connectorName,
+            AbstractStatus.State.RUNNING,
+            numTasks,
+            AbstractStatus.State.FAILED);
+    }
+
+    private Optional<Boolean> assertConnectorState(
+        String connectorName,
+        AbstractStatus.State connectorState,
+        int numTasks,
+        AbstractStatus.State tasksState) {
         try {
             ConnectorStateInfo info = connect.connectorStatus(connectorName);
             boolean result = info != null
+                    && info.connector().state().equals(connectorState.toString())
                     && info.tasks().size() == numTasks
-                    && info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
-                    && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+                    && info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString()));
             return Optional.of(result);
         } catch (Exception e) {
             log.error("Could not check connector state info.", e);