You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/05 06:58:39 UTC

[GitHub] [kafka] kkonstantine opened a new pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

kkonstantine opened a new pull request #8805:
URL: https://github.com/apache/kafka/pull/8805


   tbd
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rhauch commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

Posted by GitBox <gi...@apache.org>.
rhauch commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436925315



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       Can we make this a little easier to understand for most users? I think it might be sufficient to add some combination of:
   * what this means (e.g., the worker was partitioned and missed at least one rebalance rounds, likely due to a network issue), and 
   * what resulted (e.g., the workers gave up its tasks in case the cluster had reassigned them to another worker).
   
   And, should this be debug or info or warn? Warn seems wrong, since the user shouldn't do anything, but excessive #s of these could signal the need for additional tuning. WDYT?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       Similar comment to that above.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       Sounds good, though it'd be better to have a single (long) log message to prevent them from being separated by other log messages from other threads.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       Sounds good.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {

Review comment:
       Is it enough to trust that the # of workers has not changed, or should we compare the members, via something like:
   ```
   if (previousMembers.equals(memberConfigs.keySet()) && scheduledRebalance <= 0) {
   ```
   IOW, what happens if one worker disappeared about the same time that an operator added a new worker?
   
   IIUC from the integration tests, this logic actually doesn't care which of these is the case -- all of the task assignments that were lost will be reassigned anyway, so it doesn't matter if the worker that gets those new assignments is the old worker that came back or a new worker. Is that right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#issuecomment-641423090


   Thanks for the reviews @rhauch !
   2/3 builds were green here too and the 2 failures on JDK8 were unrelated. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#issuecomment-640156075


   Recent build history: 
   jdk8 and jdk14 green
   jdk11 two on `org.apache.kafka.streams.integration.QueryableStateIntegrationTest` which is unrelated. 
   
   Adding another test and the expectation that generations increase across all the tests. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436950305



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       This is related to the leader's internal bookkeeping when it calculates a new assignment. It's not related to the tasks that a worker (even the leader) is actually running. 
   
   Emptying/clearing the previous assignment might result in some tasks shuffling around, because the leader will calculate an assignment from scratch, but it doesn't affect running tasks. The new computed assignment will send assignment and/or revocations as needed based on a) what tasks each worker has reported running in this round and which tasks are configured in the config topic. Another way to say this is that the leader won't bother detecting lost tasks in this round. Every unassigned task will be treated as a new task. 
   
   You are right on the log message not conveying that meaning exactly. How about: 
   ```
   log.debug("Clearing the view of previous assignments due to generation mismatch between "
                       + "previous generation ID {} and last completed generation ID {}. ",
                       previousGenerationId, lastCompletedGenerationId);
   log.debug("This can happen if the leader fails to sync the assignment within a " 
                       + "rebalancing round. The following view of previous assignments might be "
                       + "outdated and will be ignored by the leader in the current computation of " 
                       + "new assignments. Possibly outdated previous assignments: {}", previousAssignment);
   ```
   
   
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       Also, given that the previous assignments are printed in `debug`, I think it makes sense to keep these log messages in debug as well. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       How about: 
   
   ```
   log.debug("The number of workers remained the same between rebalances. The missing " 
                       + "assignments that the leader is detecting are probably due to some workers " 
                       + "failing to receive the new assignments in the previous rebalance. Will "
                       + "reassign missing tasks as new tasks");
   ```

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       I agree. The length will be very similar anyways. 
   I'm pushing a commit to address your comments. 

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {

Review comment:
       I like the idea of checking the member configs. Could probably allow us to avoid duplicate tasks, even in this rare scenario of replacement of a departing node within the rebalance round itself. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#issuecomment-640853227






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine merged pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

Posted by GitBox <gi...@apache.org>.
kkonstantine merged pull request #8805:
URL: https://github.com/apache/kafka/pull/8805


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] kkonstantine commented on pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

Posted by GitBox <gi...@apache.org>.
kkonstantine commented on pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#issuecomment-640268366


   Latest build:
   jdk8 and jdk11 green
   jdk14 one unrelated failure at: `kafka.api.PlaintextAdminIntegrationTest.testAlterReplicaLogDirs`
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org