You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/06/06 00:02:44 UTC
[kafka] branch 2.4 updated: KAFKA-9851: Revoking Connect tasks due
to connectivity issues should also clear the running assignment (#8804)
This is an automated email from the ASF dual-hosted git repository.
kkarantasis pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new 40b8a7f KAFKA-9851: Revoking Connect tasks due to connectivity issues should also clear the running assignment (#8804)
40b8a7f is described below
commit 40b8a7f75b811844148cda9407fb6eedc4682841
Author: Konstantine Karantasis <ko...@confluent.io>
AuthorDate: Fri Jun 5 15:56:02 2020 -0700
KAFKA-9851: Revoking Connect tasks due to connectivity issues should also clear the running assignment (#8804)
Until recently revocation of connectors and tasks was the result of a rebalance that contained a new assignment. Therefore the view of the running assignment was kept consistent outside the call to `RebalanceListener#onRevoke`. However, after KAFKA-9184 the need appeared for the worker to revoke tasks voluntarily and proactively without having received a new assignment.
This commit will allow the worker to restart tasks that have been stopped as a result of voluntary revocation after a rebalance reassigns these tasks to the work.
The fix is tested by extending an existing integration test.
Reviewers: Randall Hauch <rh...@gmail.com>
---
.../runtime/distributed/DistributedHerder.java | 52 +++++++++++++++-------
.../integration/ConnectWorkerIntegrationTest.java | 16 +++++++
.../clusters/EmbeddedConnectClusterAssertions.java | 4 +-
3 files changed, 54 insertions(+), 18 deletions(-)
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index 079865f..703386e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1102,26 +1102,39 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
private void startWork() {
// Start assigned connectors and tasks
- log.info("Starting connectors and tasks using config offset {}", assignment.offset());
-
List<Callable<Void>> callables = new ArrayList<>();
- for (String connectorName : assignmentDifference(assignment.connectors(), runningAssignment.connectors())) {
- callables.add(getConnectorStartingCallable(connectorName));
- }
- // These tasks have been stopped by this worker due to task reconfiguration. In order to
- // restart them, they are removed just before the overall task startup from the set of
- // currently running tasks. Therefore, they'll be restarted only if they are included in
- // the assignment that was just received after rebalancing.
- runningAssignment.tasks().removeAll(tasksToRestart);
- tasksToRestart.clear();
- for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
- callables.add(getTaskStartingCallable(taskId));
+ // The sets in runningAssignment may change when onRevoked is called voluntarily by this
+ // herder (e.g. when a broker coordinator failure is detected). Otherwise the
+ // runningAssignment is always replaced by the assignment here.
+ synchronized (this) {
+ log.info("Starting connectors and tasks using config offset {}", assignment.offset());
+ log.debug("Received assignment: {}", assignment);
+ log.debug("Currently running assignment: {}", runningAssignment);
+
+ for (String connectorName : assignmentDifference(assignment.connectors(), runningAssignment.connectors())) {
+ callables.add(getConnectorStartingCallable(connectorName));
+ }
+
+ // These tasks have been stopped by this worker due to task reconfiguration. In order to
+ // restart them, they are removed just before the overall task startup from the set of
+ // currently running tasks. Therefore, they'll be restarted only if they are included in
+ // the assignment that was just received after rebalancing.
+ log.debug("Tasks to restart from currently running assignment: {}", tasksToRestart);
+ runningAssignment.tasks().removeAll(tasksToRestart);
+ tasksToRestart.clear();
+ for (ConnectorTaskId taskId : assignmentDifference(assignment.tasks(), runningAssignment.tasks())) {
+ callables.add(getTaskStartingCallable(taskId));
+ }
}
+
startAndStop(callables);
- runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0
- ? ExtendedAssignment.empty()
- : assignment;
+
+ synchronized (this) {
+ runningAssignment = member.currentProtocolVersion() == CONNECT_PROTOCOL_V0
+ ? ExtendedAssignment.empty()
+ : assignment;
+ }
log.info("Finished starting connectors and tasks");
}
@@ -1631,6 +1644,13 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
startAndStop(callables);
log.info("Finished stopping tasks in preparation for rebalance");
+ synchronized (DistributedHerder.this) {
+ log.debug("Removing connectors from running assignment {}", connectors);
+ runningAssignment.connectors().removeAll(connectors);
+ log.debug("Removing tasks from running assignment {}", tasks);
+ runningAssignment.tasks().removeAll(tasks);
+ }
+
// Ensure that all status updates have been pushed to the storage system before rebalancing.
// Otherwise, we may inadvertently overwrite the state with a stale value after the rebalance
// completes.
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index b3f3020..a0f672e 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -44,6 +44,7 @@ import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.CONNECTOR_CLIENT_POLICY_CLASS_CONFIG;
import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.connect.util.clusters.EmbeddedConnectClusterAssertions.CONNECTOR_SETUP_DURATION_MS;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -185,6 +186,7 @@ public class ConnectWorkerIntegrationTest {
*/
@Test
public void testBrokerCoordinator() throws Exception {
+ ConnectorHandle connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
workerProps.put(DistributedConfig.SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, String.valueOf(5000));
connect = connectBuilder.workerProps(workerProps).build();
// start the clusters
@@ -212,6 +214,9 @@ public class ConnectWorkerIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
"Connector tasks did not start in time.");
+ // expect that the connector will be stopped once the coordinator is detected to be down
+ StartAndStopLatch stopLatch = connectorHandle.expectedStops(1, false);
+
connect.kafka().stopOnlyKafka();
connect.assertions().assertExactlyNumWorkersAreUp(NUM_WORKERS,
@@ -221,6 +226,12 @@ public class ConnectWorkerIntegrationTest {
// heartbeat timeout * 2 + 4sec
Thread.sleep(TimeUnit.SECONDS.toMillis(10));
+ // Wait for the connector to be stopped
+ assertTrue("Failed to stop connector and tasks after coordinator failure within "
+ + CONNECTOR_SETUP_DURATION_MS + "ms",
+ stopLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
+
+ StartAndStopLatch startLatch = connectorHandle.expectedStarts(1, false);
connect.kafka().startOnlyKafkaOnSamePorts();
// Allow for the kafka brokers to come back online
@@ -234,6 +245,11 @@ public class ConnectWorkerIntegrationTest {
connect.assertions().assertConnectorAndAtLeastNumTasksAreRunning(CONNECTOR_NAME, numTasks,
"Connector tasks did not start in time.");
+
+ // Expect that the connector has started again
+ assertTrue("Failed to stop connector and tasks after coordinator failure within "
+ + CONNECTOR_SETUP_DURATION_MS + "ms",
+ startLatch.await(CONNECTOR_SETUP_DURATION_MS, TimeUnit.MILLISECONDS));
}
/**
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
index 4b13d89..04bfd66 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectClusterAssertions.java
@@ -36,7 +36,7 @@ public class EmbeddedConnectClusterAssertions {
private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectClusterAssertions.class);
public static final long WORKER_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(60);
- private static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
+ public static final long CONNECTOR_SETUP_DURATION_MS = TimeUnit.SECONDS.toMillis(30);
private final EmbeddedConnectCluster connect;
@@ -184,7 +184,7 @@ public class EmbeddedConnectClusterAssertions {
waitForCondition(
() -> checkConnectorAndTasksAreStopped(connectorName),
CONNECTOR_SETUP_DURATION_MS,
- "At least the connector or one of its tasks is still");
+ "At least the connector or one of its tasks is still running");
} catch (AssertionError e) {
throw new AssertionError(detailMessage, e);
}