You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/08 10:24:10 UTC

[GitHub] [kafka] yashmayya opened a new pull request, #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore

yashmayya opened a new pull request, #12490:
URL: https://github.com/apache/kafka/pull/12490

   - https://issues.apache.org/jira/browse/KAFKA-14147
   - Similar to https://issues.apache.org/jira/browse/KAFKA-8869, but without the rebalancing implications.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r950226900


##########
connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java:
##########
@@ -842,6 +842,7 @@ public void testBackgroundConnectorDeletion() throws Exception {
         // Task configs for the deleted connector should also be removed from the snapshot
         assertEquals(Collections.emptyList(), configState.allTaskConfigs(CONNECTOR_IDS.get(0)));
         assertEquals(0, configState.taskCount(CONNECTOR_IDS.get(0)));
+        assertEquals(0, configStorage.deferredTaskUpdates.size());

Review Comment:
   Nit: it's better if we can do a comparison of the entire map here, since that provides better error messages if the assertion fails.
   
   Also, we should add a comment explaining why we have this check since it may get removed in a future change if it's unclear why this is necessary.
   
   ```suggestion
           // Make sure the deleted connector is removed from the map in order to prevent unbounded growth
           assertEquals(Collections.emptyMap(), configStorage.deferredTaskUpdates);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r940362196


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   Ah whoops, I did attempt to check the impact of this change on zombie fencing but completely missed that connectors can be re-created with the same name which would require potentially fencing off older zombie tasks. No practical fallout observed due to this, so I can revert this. Also, I think we don't want to remove the entry from `connectorTaskConfigGenerations` for the same reason right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r948667264


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   Ah cool, that makes sense. Thanks, added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r940338098


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   Unfortunately, we can't safely remove this entry. There may still be zombie tasks running for this connector that would need to be fenced out before bringing up new tasks if it got recreated.
   
   One of the [rejected alternatives in KIP-618](https://cwiki.apache.org/confluence/display/KAFKA/KIP-618%3A+Exactly-Once+Support+for+Source+Connectors#KIP618:ExactlyOnceSupportforSourceConnectors-Cleanupoftaskcountrecordsonconnectordeletion) was to perform a final round of fencing for source connectors after they were deleted. The rationale was that the work would be unnecessary if the connector were never recreated, and would provide little advantage if it were (since the total number of required fencings would remain unchanged).
   
   The point that the these task count records can grow monotonically is an interesting one, but I'm not sure it's enough to tip the balance towards a fence-on-delete approach. We don't wipe offsets for deleted source connectors, and although this has been a pain point for many users who would like to reset those offsets without having to rename their connectors, to my knowledge, the storage cost for these offsets has not been an issue.
   
   Have you observed practical fallout from this issue, or is this more of a general exercise in code cleanliness?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r943666445


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   `deferredTaskUpdates` isn't exposed in the snapshot so I'll need to check what sort of test we could have for this. Thanks a lot for verifying my understanding and the great explanation!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on a diff in pull request #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
yashmayya commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r940928631


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   Tbh, it doesn't really seem like it's worth the mess of null handling everywhere. I'm gonna back out this change and make this a single line PR 😆
   
   Btw, unrelated to this PR - but I noticed that the task config generations aren't persisted to the config topic, they're simply maintained in the in-memory map. So, based on worker restarts, different workers (herders to be precise) could have different views on the task config generations for the same connector because the config topic is compacted and older task commit records could be lost (the task config generation is incremented by 1 each time a task commit record is encountered). From what I can tell, task config generations are used in two places primarily - 
   
   i) We compare the task config generation from prior to a EOS task startup with the task config generation after the task has initialized its transactional producer (and fail it if there's a mismatch indicating a new set of tasks have been brought up) 
   
   ii) At the end of a zombie fencing, if the task config generation is greater than the task config generation at the beginning of the zombie fencing, a `409` is returned by the fencing endpoint because a new set of tasks were brought up in the meanwhile
   
   I'm struggling to think of any cases where this would cause an issue (i.e. different herders having different values for task config generations of the same connector) but I was hoping that you could verify my understanding here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante merged pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
C0urante merged PR #12490:
URL: https://github.com/apache/kafka/pull/12490


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r941405027


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   > Tbh, it doesn't really seem like it's worth the mess of null handling everywhere. I'm gonna back out this change and make this a single line PR 😆
   
   A single line PR... with tests? 😄
   
   Your understanding is correct--we only track task generations in memory, different herders may have different generations for the same set of task configs, and we use generations to abort task startup after initializing their transactional producer and to abort persisting zombie fencing records to the config topic.
   
   The reason this is all fine is that we don't really need to track an exact generation number; all we have to do is track whether a new set of task configs for a given connector has appeared after a specific set of task configs. Compaction should not change the fact that, once we have a generation number for a set of task configs, generation numbers for later task configs will be greater than that number.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12490: KAFKA-14147: Prevent deferredTaskUpdates map from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r948191499


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   👍 happy to help!
   
   RE testing: I wouldn't overthink it; you can just upgrade the visibility of `deferredTaskUpdates` from private to package-private (possibly with a `// visible for testing` comment above the field declaration) and then probe its contents in the `KafkaConfigBackingStoreTest` suite, either within existing tests or with a new test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] yashmayya commented on pull request #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
yashmayya commented on PR #12490:
URL: https://github.com/apache/kafka/pull/12490#issuecomment-1207958508

   @C0urante would you be able to take a look at this whenever possible?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] C0urante commented on a diff in pull request #12490: KAFKA-14147: Prevent maps from growing monotonically in KafkaConfigBackingStore

Posted by GitBox <gi...@apache.org>.
C0urante commented on code in PR #12490:
URL: https://github.com/apache/kafka/pull/12490#discussion_r940409780


##########
connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigBackingStore.java:
##########
@@ -853,6 +853,9 @@ private void processConnectorConfigRecord(String connectorName, SchemaAndValue v
                 connectorConfigs.remove(connectorName);
                 connectorTaskCounts.remove(connectorName);
                 taskConfigs.keySet().removeIf(taskId -> taskId.connector().equals(connectorName));
+                deferredTaskUpdates.remove(connectorName);
+                connectorTaskCountRecords.remove(connectorName);

Review Comment:
   I think we could possibly keep the deletion logic for task config generations, but we'd have to make the herder resilient against null values being returned from `ClusterConfigState::taskConfigGeneration` in case a connector is deleted while there's an ongoing round of zombie fencing. Up to you if you'd like to tackle that, and if so, if you'd prefer to do that here or in a follow-up.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org