You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by kk...@apache.org on 2020/05/21 04:05:57 UTC

[kafka] branch 2.3 updated: KAFKA-8869: Remove task configs for deleted connectors from config snapshot (#8444)

This is an automated email from the ASF dual-hosted git repository.

kkarantasis pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 2d34fca  KAFKA-8869: Remove task configs for deleted connectors from config snapshot (#8444)
2d34fca is described below

commit 2d34fca2145bbd2af233688c45847bc4ae3ce6e8
Author: Chris Egerton <ch...@confluent.io>
AuthorDate: Wed May 20 20:15:43 2020 -0700

    KAFKA-8869: Remove task configs for deleted connectors from config snapshot (#8444)
    
    Currently, if a connector is deleted, its task configurations will remain in the config snapshot tracked by the KafkaConfigBackingStore. This causes issues with incremental cooperative rebalancing, which utilizes that config snapshot to determine which connectors and tasks need to be assigned across the cluster. Specifically, it first checks to see which connectors are present in the config snapshot, and then, for each of those connectors, queries the snapshot for that connector's tas [...]
    
    The lifecycle of a connector is for its configuration to be written to the config topic, that write to be picked up by the workers in the cluster and trigger a rebalance, the connector to be assigned to and started by a worker, task configs to be generated by the connector and then written to the config topic, that write to be picked up by the workers in the cluster and trigger a second rebalance, and finally, the tasks to be assigned to and started by workers across the cluster.
    
    There is a brief period in between the first time the connector is started and when the second rebalance has completed during which those stale task configs from a previously-deleted version of the connector will be used by the framework to start tasks for that connector. This fix aims to eliminate that window by preemptively clearing the task configs from the config snapshot for a connector whenever it has been deleted.
    
    An existing unit test is modified to verify this behavior, and should provide sufficient guarantees that the bug has been fixed.
    
    Reviewers: Nigel Liang <ni...@nigelliang.com>, Konstantine Karantasis <ko...@confluent.io>
---
 .../kafka/connect/storage/KafkaConfigBackingStore.java       |  2 ++
 .../kafka/connect/storage/KafkaConfigBackingStoreTest.java   | 12 +++++++++++-
 2 files changed, 13 insertions(+), 1 deletion(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
index 3572d8c..a4242d0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java
@@ -516,6 +516,8 @@ public class KafkaConfigBackingStore implements ConfigBackingStore {
                         // Connector deletion will be written as a null value
                         log.info("Successfully processed removal of connector '{}'", connectorName);
                         connectorConfigs.remove(connectorName);
+                        connectorTaskCounts.remove(connectorName);
+                        taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
                         removed = true;
                     } else {
                         // Connector configs can be applied and callbacks invoked immediately
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
index 8e358e0..0a82a09 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -57,6 +57,7 @@ import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
@@ -537,7 +538,7 @@ public class KafkaConfigBackingStoreTest {
         LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap<>();
         deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
         deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(1));
         deserialized.put(CONFIGS_SERIALIZED.get(3), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
         logOffset = 5;
 
@@ -566,8 +567,17 @@ public class KafkaConfigBackingStoreTest {
         // Should see a single connector with initial state paused
         ClusterConfigState configState = configStorage.snapshot();
         assertEquals(TargetState.STARTED, configState.targetState(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.subList(0, 2), configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
+        assertEquals(2, configState.taskCount(CONNECTOR_IDS.get(0)));
 
         configStorage.refresh(0, TimeUnit.SECONDS);
+        configState = configStorage.snapshot();
+        // Connector should now be removed from the snapshot
+        assertFalse(configState.contains(CONNECTOR_IDS.get(0)));
+        // Task configs for the deleted connector should also be removed from the snapshot
+        assertEquals(Collections.emptyList(), configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
+        assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
 
         configStorage.stop();