You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/22 17:13:31 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429352759



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -334,6 +334,7 @@ public String toString() {
             ") prevStandbyTasks: (" + prevStandbyTasks +
             ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() +
             ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
+            ") taskLagTotals: (" + taskLagTotals.entrySet() +

Review comment:
       I found this useful while debugging the system test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       This is where we forgot to copy over the configs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least one warmup replica");
+            }
+

Review comment:
       I added this constraint to mirror the constraint we already apply in StreamConfig. It's not critical, but I was disappointed that I had written a bunch of tests that included a technically invalid configuration.
   
   I'll write a test for this...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -90,15 +91,12 @@ public boolean assign(final Map<UUID, ClientState> clients,
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
 
-        // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
-        // due to being configured for no warmups.
-        final boolean probingRebalanceNeeded =
-            configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0;
+        final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0;
 
         log.info("Decided on assignment: " +
                      clientStates +
-                     " with " +
-                     (probingRebalanceNeeded ? "" : "no") +
+                     " with" +
+                     (probingRebalanceNeeded ? "" : " no") +

Review comment:
       Fixing a double-space we were printing when there was a followup. (It would say `with  followup`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -90,15 +91,12 @@ public boolean assign(final Map<UUID, ClientState> clients,
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
 
-        // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
-        // due to being configured for no warmups.
-        final boolean probingRebalanceNeeded =
-            configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0;
+        final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0;

Review comment:
       Since we can't have zero warmups, we don't need this condition (that I added in https://github.com/apache/kafka/pull/8696)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();

Review comment:
       A short-lived, empty TreeSet costs practically nothing, and I found the other logic (with null meaning empty) a bit confusing during debugging.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -56,14 +56,17 @@ private int numCaughtUpClients() {
         return caughtUpClients.size();
     }
 
-    /**
-     * @return true if this client is caught-up for this task, or the task has no caught-up clients
-     */
+    private static boolean taskIsNotCaughtUpOnClientAndOtherCaughtUpClientsExist(final TaskId task,
+                                                                                 final UUID client,
+                                                                                 final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
+        return !taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+    }

Review comment:
       Expanding DeMorgan's law at @cadonna 's request (which I also appreciated).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();
             for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
                 final UUID client = clientEntry.getKey();
                 final long taskLag = clientEntry.getValue().lagFor(task);
-                if (taskLag == Task.LATEST_OFFSET || taskLag <= acceptableRecoveryLag) {
-                    taskToCaughtUpClients.computeIfAbsent(task, ignored -> new TreeSet<>()).add(client);
+                if (active(taskLag) || unbounded(acceptableRecoveryLag) || acceptable(acceptableRecoveryLag, taskLag)) {

Review comment:
       I realized that our condition was actually wrong here. In addition to all the zero-or-greater lags, there are two negative lags, one meaning "unknown" (-1), and one meaning "latest" (-2). When we said `taskLag <= acceptableRecoveryLag`, it was unintentionally encompassing the sentinel values as well. Even if we want a sentinel to be considered "caught up" (as with "Latest"), we should do so explicitly, not by mathematical coincidence.
   
   I also added a special case when acceptableRecoveryLag is set to MAX_VALUE to indicate that all tasks, regardless of their lag (even if it's a sentinel), are to be considered caught-up.
   
   I also found the boolean expression with all the conditionals a little hard to read, so I pulled out some semantic methods.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -150,7 +147,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClients
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 0, 0, 0L)
+            new AssignmentConfigs(0L, 1, 0, 0L)

Review comment:
       All these tests erroneously set "max warmups" to zero.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -81,7 +81,7 @@
     );
 
     @Test
-    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {

Review comment:
       The diff is misaligned. I removed `shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups` and added `shouldSkipWarmupsWhenAcceptableLagIsMax`. 

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -44,6 +44,9 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     CLEAN_NODE_ENABLED = True
 
     logs = {
+        "streams_config": {
+            "path": CONFIG_FILE,
+            "collect_default": True},

Review comment:
       It was handy to be able to see the used config file while debugging.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -465,6 +468,9 @@ def prop_file(self):
         properties['reduce.topic'] = self.REDUCE_TOPIC
         properties['join.topic'] = self.JOIN_TOPIC
 
+        # Long.MAX_VALUE lets us do the assignment without a warmup
+        properties['acceptable.recovery.lag'] = "9223372036854775807"
+

Review comment:
       Added this configuration to fix the flaky `StreamsOptimizedTest.test_upgrade_optimized_topology`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org