You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ce...@apache.org on 2022/08/04 15:48:24 UTC

[kafka] branch 3.3 updated: MINOR: Update comment on verifyTaskGenerationAndOwnership method in DistributedHerder

This is an automated email from the ASF dual-hosted git repository.

cegerton pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 953a3a52c4 MINOR: Update comment on verifyTaskGenerationAndOwnership method in DistributedHerder
953a3a52c4 is described below

commit 953a3a52c463ecfd2f1c3db46dc44b4c8dbb06d8
Author: Yash Mayya <ya...@gmail.com>
AuthorDate: Fri Jul 29 02:48:35 2022 +0530

    MINOR: Update comment on verifyTaskGenerationAndOwnership method in DistributedHerder
    
    Reviewers: Chris Egerton <fe...@gmail.com>
---
 .../kafka/connect/runtime/distributed/DistributedHerder.java      | 8 +++-----
 1 file changed, 3 insertions(+), 5 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
index ded833da59..388bfa4218 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
@@ -1732,9 +1732,7 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
                                     throw ConnectUtils.maybeWrap(cause, "Failed to perform round of zombie fencing");
                                 }
                             },
-                            () -> {
-                                verifyTaskGenerationAndOwnership(taskId, taskGeneration);
-                            }
+                            () -> verifyTaskGenerationAndOwnership(taskId, taskGeneration)
                     );
                 } else {
                     return worker.startSourceTask(
@@ -1941,8 +1939,8 @@ public class DistributedHerder extends AbstractHerder implements Runnable {
         }
     }
 
-    // Currently unused, but will be invoked by exactly-once source tasks after they have successfully
-    // initialized their transactional producer
+    // Invoked by exactly-once worker source tasks after they have successfully initialized their transactional
+    // producer to ensure that it is still safe to bring up the task
     private void verifyTaskGenerationAndOwnership(ConnectorTaskId id, int initialTaskGen) {
         log.debug("Reading to end of config topic to ensure it is still safe to bring up source task {} with exactly-once support", id);
         if (!refreshConfigSnapshot(Long.MAX_VALUE)) {