You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/09 16:12:50 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #8805: KAFKA-9848: Avoid triggering scheduled rebalance delay when task assignment fails but Connect workers remain in the group

rhauch commented on a change in pull request #8805:
URL: https://github.com/apache/kafka/pull/8805#discussion_r436925315



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       Can we make this a little easier to understand for most users? I think it might be sufficient to add some combination of:
   * what this means (e.g., the worker was partitioned and missed at least one rebalance rounds, likely due to a network issue), and 
   * what resulted (e.g., the workers gave up its tasks in case the cluster had reassigned them to another worker).
   
   And, should this be debug or info or warn? Warn seems wrong, since the user shouldn't do anything, but excessive #s of these could signal the need for additional tuning. WDYT?

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       Similar comment to that above.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -159,6 +163,13 @@ private Long ensureLeaderConfig(long maxOffset, WorkerCoordinator coordinator) {
         // Base set: The previous assignment of connectors-and-tasks is a standalone snapshot that
         // can be used to calculate derived sets
         log.debug("Previous assignments: {}", previousAssignment);
+        int lastCompletedGenerationId = coordinator.lastCompletedGenerationId();
+        if (previousGenerationId != lastCompletedGenerationId) {
+            log.debug("Emptying previous assignments due to generation mismatch between previous "
+                    + "generation ID {} and last completed generation ID {} since the last assignment: {}",
+                    previousGenerationId, lastCompletedGenerationId, previousAssignment);

Review comment:
       Sounds good, though it'd be better to have a single (long) log message to prevent them from being separated by other log messages from other threads.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +369,14 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {
+            log.debug("Group size is same between rebalances. Lost assignments are probably due to lost SyncGroup "
+                    + "responses. Treating lost tasks as new tasks");

Review comment:
       Sounds good.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/IncrementalCooperativeAssignor.java
##########
@@ -361,6 +373,16 @@ protected void handleLostAssignments(ConnectorsAndTasks lostAssignments,
         log.debug("Found the following connectors and tasks missing from previous assignments: "
                 + lostAssignments);
 
+        if (previousMembers.size() == memberConfigs.size() && scheduledRebalance <= 0) {

Review comment:
       Is it enough to trust that the # of workers has not changed, or should we compare the members, via something like:
   ```
   if (previousMembers.equals(memberConfigs.keySet()) && scheduledRebalance <= 0) {
   ```
   IOW, what happens if one worker disappeared about the same time that an operator added a new worker?
   
   IIUC from the integration tests, this logic actually doesn't care which of these is the case -- all of the task assignments that were lost will be reassigned anyway, so it doesn't matter if the worker that gets those new assignments is the old worker that came back or a new worker. Is that right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org