You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:30:56 UTC

[GitHub] [kafka] kkonstantine commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

kkonstantine commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436950305



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       This is related to the leader's internal bookkeeping when it calculates a new assignment. It's not related to the tasks that a worker (even the leader) is actually running. 
   
   Emptying/clearing the previous assignment might result in some tasks shuffling around, because the leader will calculate an assignment from scratch, but it doesn't affect running tasks. The new computed assignment will send assignment and/or revocations as needed based on a) what tasks each worker has reported running in this round and which tasks are configured in the config topic. Another way to say this is that the leader won't bother detecting lost tasks in this round. Every unassigned task will be treated as a new task. 
   
   You are right on the log message not conveying that meaning exactly. How about: 
   ```
   log.debug("Clearing the view of previous assignments due to generation mismatch between "
                       + "previous generation ID {} and last completed generation ID {}. ",
                       previousGenerationId, lastCompletedGenerationId);
   log.debug("This can happen if the leader fails to sync the assignment within a " 
                       + "rebalancing round. The following view of previous assignments might be "
                       + "outdated and will be ignored by the leader in the current computation of " 
                       + "new assignments. Possibly outdated previous assignments: {}", previousAssignment);
   ```
   
   
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       Also, given that the previous assignments are printed in `debug`, I think it makes sense to keep these log messages in debug as well. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       How about: 
   
   ```
   log.debug("The number of workers remained the same between rebalances. The missing " 
                       + "assignments that the leader is detecting are probably due to some workers " 
                       + "failing to receive the new assignments in the previous rebalance. Will "
                       + "reassign missing tasks as new tasks");
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       I agree. The length will be very similar anyways. 
   I'm pushing a commit to address your comments. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {

Review comment:
       I like the idea of checking the member configs. Could probably allow us to avoid duplicate tasks, even in this rare scenario of replacement of a departing node within the rebalance round itself. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org