You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/27 14:43:17 UTC

[GitHub] [kafka] cadonna commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

cadonna commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r415797071



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {
+            log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
+                         + "trigger another rebalance to retry.");
+            setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+            taskAssignor = new PriorTaskAssignor();
         } else {
-            taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, false);
+            taskAssignor = this.taskAssignor.get();
         }

Review comment:
       prop:
   Could we package this logic into a factory method to make the code more readable?
   
   ```
   final TaskAssignor taskAssignor = createTaskAssignor(boolean lagComputationSuccessful);
   ```

##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1146,4 +1146,13 @@ private static byte checkRange(final byte i) {
             }
         };
     }
+
+    @SafeVarargs
+    public static <E> Set<E> union(final Supplier<Set<E>> constructor, final Set<E>... set) {

Review comment:
       req: Please add unit tests for this method

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -16,49 +16,50 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
-import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
-import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Set;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
+import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
+import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;

Review comment:
       You are an exemplary boy scout!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -86,6 +90,22 @@ private ClientState(final Set<TaskId> activeTasks,
         this.capacity = capacity;
     }
 
+    public ClientState(final Set<TaskId> previousActiveTasks,

Review comment:
       req: Please add a unit test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {

Review comment:
       I see your point. What I do not like so much is that it is not very intuitive to require successful lag computation for sticky assignor. I understand that if lag computation is not successful other parts of Streams will fail, but it is not the responsibility of this class to avoid that. I think what I am trying to say is that the verifications should be done where they are required to make the code easily comprehensible. I am just imagining me coming back to this code and trying to understand why the lag computation must be successful for the sticky assignor.     

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/StickyTaskAssignorTest.java
##########
@@ -350,20 +350,20 @@ public void shouldAssignMoreTasksToClientWithMoreCapacity() {
         createClient(UUID_2, 2);
         createClient(UUID_1, 1);
 
-        final StickyTaskAssignor taskAssignor = createTaskAssignor(TASK_0_0,
-                                                                            TASK_0_1,
-                                                                            TASK_0_2,
-                                                                            new TaskId(1, 0),
-                                                                            new TaskId(1, 1),
-                                                                            new TaskId(1, 2),
-                                                                            new TaskId(2, 0),
-                                                                            new TaskId(2, 1),
-                                                                            new TaskId(2, 2),
-                                                                            new TaskId(3, 0),
-                                                                            new TaskId(3, 1),
-                                                                            new TaskId(3, 2));
-
-        taskAssignor.assign();
+        final boolean followupRebalanceNeeded = assign(TASK_0_0,
+                                                       TASK_0_1,
+                                                       TASK_0_2,
+                                                       new TaskId(1, 0),
+                                                       new TaskId(1, 1),
+                                                       new TaskId(1, 2),
+                                                       new TaskId(2, 0),
+                                                       new TaskId(2, 1),
+                                                       new TaskId(2, 2),
+                                                       new TaskId(3, 0),
+                                                       new TaskId(3, 1),
+                                                       new TaskId(3, 2));

Review comment:
       prop:
   ```suggestion
           final boolean followupRebalanceNeeded = assign(
               TASK_0_0,
               TASK_0_1,
               TASK_0_2,
               new TaskId(1, 0),
               new TaskId(1, 1),
               new TaskId(1, 2),
               new TaskId(2, 0),
               new TaskId(2, 1),
               new TaskId(2, 2),
               new TaskId(3, 0),
               new TaskId(3, 1),
               new TaskId(3, 2)
           );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/TaskAssignorConvergenceTest.java
##########
@@ -416,11 +416,10 @@ private static void testForConvergence(final Harness harness,
             iteration++;
             harness.prepareForNextRebalance();
             harness.recordBefore(iteration);
-            rebalancePending = new HighAvailabilityTaskAssignor(
-                harness.clientStates, allTasks,
-                harness.statefulTaskEndOffsetSums.keySet(),
-                configs
-            ).assign();
+            rebalancePending = new HighAvailabilityTaskAssignor().assign(harness.clientStates,
+                                                                         allTasks,
+                                                                         harness.statefulTaskEndOffsetSums.keySet(),
+                                                                         configs);

Review comment:
       prop:
   ```suggestion
               rebalancePending = new HighAvailabilityTaskAssignor().assign(
                   harness.clientStates,
                   allTasks,
                   harness.statefulTaskEndOffsetSums.keySet(),
                   configs
               );
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org