You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by rh...@apache.org on 2019/12/04 03:56:14 UTC
[kafka] branch 2.4 updated: KAFKA-9258 Check Connect Metrics
non-null in task stop (#7768)
This is an automated email from the ASF dual-hosted git repository.
rhauch pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 6a2cb6c KAFKA-9258 Check Connect Metrics non-null in task stop (#7768)
6a2cb6c is described below
commit 6a2cb6c3824c34d00ac3edf076643e798af3d617
Author: Cyrus Vafadari <cy...@alum.mit.edu>
AuthorDate: Tue Dec 3 19:13:52 2019 -0800
KAFKA-9258 Check Connect Metrics non-null in task stop (#7768)
Remove nullcheck, and add integration tests for restarting a failed task.
Authors: Cyrus Vafadari <cy...@confluent.io>, Chris Egerton <ch...@confluent.io>
Reviewers: Arjun Satish <ar...@confluent.io>, Randall Hauch <rh...@gmail.com>
---
.../org/apache/kafka/connect/runtime/Worker.java | 2 +-
.../integration/ConnectWorkerIntegrationTest.java | 86 +++++++++++++++++++---
2 files changed, 78 insertions(+), 10 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index ff5eee3..e496d41 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -726,12 +726,12 @@ public class Worker {
private void awaitStopTask(ConnectorTaskId taskId, long timeout) {
try (LoggingContext loggingContext = LoggingContext.forTask(taskId)) {
WorkerTask task = tasks.remove(taskId);
- connectorStatusMetricsGroup.recordTaskRemoved(taskId);
if (task == null) {
log.warn("Ignoring await stop request for non-present task {}", taskId);
return;
}
+ connectorStatusMetricsGroup.recordTaskRemoved(taskId);
if (!task.awaitStop(timeout)) {
log.error("Graceful stop of task {} failed.", task.id());
task.cancel();
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index ca63a07..7cdfa7d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -30,17 +30,22 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX;
import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
import static org.apache.kafka.test.TestUtils.waitForCondition;
import static org.junit.Assert.assertFalse;
@@ -65,20 +70,20 @@ public class ConnectWorkerIntegrationTest {
@Before
public void setup() throws IOException {
// setup Connect worker properties
- Map<String, String> exampleWorkerProps = new HashMap<>();
- exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+ Map<String, String> workerProps = new HashMap<>();
+ workerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, String.valueOf(OFFSET_COMMIT_INTERVAL_MS));
+ workerProps.put(CONNECTOR_CLIENT_POLICY_CLASS_CONFIG, "All");
// setup Kafka broker properties
- Properties exampleBrokerProps = new Properties();
- exampleBrokerProps.put("auto.create.topics.enable", String.valueOf(false));
+ Properties brokerProps = new Properties();
+ brokerProps.put("auto.create.topics.enable", String.valueOf(false));
// build a Connect cluster backed by Kafka and Zk
connect = new EmbeddedConnectCluster.Builder()
.name("connect-cluster")
.numWorkers(NUM_WORKERS)
- .workerProps(exampleWorkerProps)
- .brokerProps(exampleBrokerProps)
- .maskExitProcedures(true) // true is the default, setting here as example
+ .workerProps(workerProps)
+ .brokerProps(brokerProps)
.build();
// start the clusters
@@ -140,6 +145,42 @@ public class ConnectWorkerIntegrationTest {
}
/**
+ * Verify that a failed task can be restarted successfully.
+ */
+ @Test
+ public void testRestartFailedTask() throws Exception {
+ int numTasks = 1;
+
+ // Properties for the source connector. The task should fail at startup due to the bad broker address.
+ Map<String, String> connectorProps = new HashMap<>();
+ connectorProps.put(CONNECTOR_CLASS_CONFIG, MonitorableSourceConnector.class.getName());
+ connectorProps.put(TASKS_MAX_CONFIG, Objects.toString(numTasks));
+ connectorProps.put(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG, "nobrokerrunningatthisaddress");
+
+ waitForCondition(() -> assertWorkersUp(NUM_WORKERS).orElse(false),
+ WORKER_SETUP_DURATION_MS, "Initial group of workers did not start in time.");
+
+ // Try to start the connector and its single task.
+ connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+ waitForCondition(() -> assertConnectorTasksFailed(CONNECTOR_NAME, numTasks).orElse(false),
+ CONNECTOR_SETUP_DURATION_MS, "Connector tasks did not fail in time");
+
+ // Reconfigure the connector without the bad broker address.
+ connectorProps.remove(CONNECTOR_CLIENT_PRODUCER_OVERRIDES_PREFIX + BOOTSTRAP_SERVERS_CONFIG);
+ connect.configureConnector(CONNECTOR_NAME, connectorProps);
+
+ // Restart the failed task
+ String taskRestartEndpoint = connect.endpointForResource(
+ String.format("connectors/%s/tasks/0/restart", CONNECTOR_NAME));
+ connect.executePost(taskRestartEndpoint, "", Collections.emptyMap());
+
+ // Ensure the task started successfully this time
+ waitForCondition(() -> assertConnectorAndTasksRunning(CONNECTOR_NAME, numTasks).orElse(false),
+ CONNECTOR_SETUP_DURATION_MS, "Connector tasks are not all in running state.");
+ }
+
+ /**
* Confirm that the requested number of workers is up and running.
*
* @param numWorkers the number of online workers
@@ -163,12 +204,39 @@ public class ConnectWorkerIntegrationTest {
* @return true if the connector and tasks are in RUNNING state; false otherwise
*/
private Optional<Boolean> assertConnectorAndTasksRunning(String connectorName, int numTasks) {
+ return assertConnectorState(
+ connectorName,
+ AbstractStatus.State.RUNNING,
+ numTasks,
+ AbstractStatus.State.RUNNING);
+ }
+
+ /**
+ * Confirm that a connector is running, that it has a specific number of tasks, and that all of
+ * its tasks are in the FAILED state.
+ * @param connectorName the connector
+ * @param numTasks the expected number of tasks
+ * @return true if the connector is in RUNNING state and its tasks are in FAILED state; false otherwise
+ */
+ private Optional<Boolean> assertConnectorTasksFailed(String connectorName, int numTasks) {
+ return assertConnectorState(
+ connectorName,
+ AbstractStatus.State.RUNNING,
+ numTasks,
+ AbstractStatus.State.FAILED);
+ }
+
+ private Optional<Boolean> assertConnectorState(
+ String connectorName,
+ AbstractStatus.State connectorState,
+ int numTasks,
+ AbstractStatus.State tasksState) {
try {
ConnectorStateInfo info = connect.connectorStatus(connectorName);
boolean result = info != null
+ && info.connector().state().equals(connectorState.toString())
&& info.tasks().size() == numTasks
- && info.connector().state().equals(AbstractStatus.State.RUNNING.toString())
- && info.tasks().stream().allMatch(s -> s.state().equals(AbstractStatus.State.RUNNING.toString()));
+ && info.tasks().stream().allMatch(s -> s.state().equals(tasksState.toString()));
return Optional.of(result);
} catch (Exception e) {
log.error("Could not check connector state info.", e);