You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/05/22 16:47:59 UTC

[GitHub] [kafka] vvcephei opened a new pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

vvcephei opened a new pull request #8716:
URL: https://github.com/apache/kafka/pull/8716


   Also fixes a system test by configuring the HATA to perform a one-shot balanced assignment
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#issuecomment-634820186


   The only failures were:
       org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
       org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429490753



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least one warmup replica");
+            }
+

Review comment:
       Should we add the same for all the other configs?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429490483



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       Can we add a note to `AssignorConfiguration` explaining that you have to do this any time you add a new Streams config?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#issuecomment-634333269






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r431342011



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+
+@Category(IntegrationTest.class)
+public class TaskAssignorIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, IllegalAccessException {
+        // This test uses reflection to check and make sure that all the expected configurations really
+        // make it all the way to configure the task assignor. There's no other use case for being able
+        // to extract all these fields, so reflection is a good choice until we find that the maintenance
+        // burden is too high.
+        //
+        // Also note that this is an integration test because so many components have to come together to
+        // ensure these configurations wind up where they belong, and any number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input");
+
+        // Maybe I'm paranoid, but I don't want the compiler deciding that my lambdas are equal to the identity
+        // function and defeating my identity check
+        final AtomicInteger compilerDefeatingReference = new AtomicInteger(0);
+
+        // the implementation doesn't matter, we're just going to verify the reference.
+        final AssignorConfiguration.AssignmentListener configuredAssignmentListener =
+            stable -> compilerDefeatingReference.incrementAndGet();
+
+        final Properties properties = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"),
+                mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
+                mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
+                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),

Review comment:
       Good idea; I'm not sure why I didn't think to do this already. With my newfound understanding of this config translation logic, I'm confident that it gets copied over right now, because it's not a registered config, but a regression test would be nice.
   
   I'll quickly follow up with a separate PR so that I can merge this one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429352759



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -334,6 +334,7 @@ public String toString() {
             ") prevStandbyTasks: (" + prevStandbyTasks +
             ") prevOwnedPartitionsByConsumerId: (" + ownedPartitions.keySet() +
             ") changelogOffsetTotalsByTask: (" + taskOffsetSums.entrySet() +
+            ") taskLagTotals: (" + taskLagTotals.entrySet() +

Review comment:
       I found this useful while debugging the system test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       This is where we forgot to copy over the configs.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least one warmup replica");
+            }
+

Review comment:
       I added this constraint to mirror the constraint we already apply in StreamConfig. It's not critical, but I was disappointed that I had written a bunch of tests that included a technically invalid configuration.
   
   I'll write a test for this...

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -90,15 +91,12 @@ public boolean assign(final Map<UUID, ClientState> clients,
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
 
-        // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
-        // due to being configured for no warmups.
-        final boolean probingRebalanceNeeded =
-            configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0;
+        final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0;
 
         log.info("Decided on assignment: " +
                      clientStates +
-                     " with " +
-                     (probingRebalanceNeeded ? "" : "no") +
+                     " with" +
+                     (probingRebalanceNeeded ? "" : " no") +

Review comment:
       Fixing a double-space we were printing when there was a followup. (It would say `with  followup`)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -90,15 +91,12 @@ public boolean assign(final Map<UUID, ClientState> clients,
 
         assignStatelessActiveTasks(clientStates, diff(TreeSet::new, allTaskIds, statefulTasks));
 
-        // We shouldn't plan a probing rebalance if we _needed_ task movements, but couldn't do any
-        // due to being configured for no warmups.
-        final boolean probingRebalanceNeeded =
-            configs.maxWarmupReplicas > 0 && neededActiveTaskMovements + neededStandbyTaskMovements > 0;
+        final boolean probingRebalanceNeeded = neededActiveTaskMovements + neededStandbyTaskMovements > 0;

Review comment:
       Since we can't have zero warmups, we don't need this condition (that I added in https://github.com/apache/kafka/pull/8696)

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();

Review comment:
       A short-lived, empty TreeSet costs practically nothing, and I found the other logic (with null meaning empty) a bit confusing during debugging.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/TaskMovement.java
##########
@@ -56,14 +56,17 @@ private int numCaughtUpClients() {
         return caughtUpClients.size();
     }
 
-    /**
-     * @return true if this client is caught-up for this task, or the task has no caught-up clients
-     */
+    private static boolean taskIsNotCaughtUpOnClientAndOtherCaughtUpClientsExist(final TaskId task,
+                                                                                 final UUID client,
+                                                                                 final Map<TaskId, SortedSet<UUID>> tasksToCaughtUpClients) {
+        return !taskIsCaughtUpOnClientOrNoCaughtUpClientsExist(task, client, tasksToCaughtUpClients);
+    }

Review comment:
       Expanding DeMorgan's law at @cadonna 's request (which I also appreciated).

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();
             for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
                 final UUID client = clientEntry.getKey();
                 final long taskLag = clientEntry.getValue().lagFor(task);
-                if (taskLag == Task.LATEST_OFFSET || taskLag <= acceptableRecoveryLag) {
-                    taskToCaughtUpClients.computeIfAbsent(task, ignored -> new TreeSet<>()).add(client);
+                if (active(taskLag) || unbounded(acceptableRecoveryLag) || acceptable(acceptableRecoveryLag, taskLag)) {

Review comment:
       I realized that our condition was actually wrong here. In addition to all the zero-or-greater lags, there are two negative lags, one meaning "unknown" (-1), and one meaning "latest" (-2). When we said `taskLag <= acceptableRecoveryLag`, it was unintentionally encompassing the sentinel values as well. Even if we want a sentinel to be considered "caught up" (as with "Latest"), we should do so explicitly, not by mathematical coincidence.
   
   I also added a special case when acceptableRecoveryLag is set to MAX_VALUE to indicate that all tasks, regardless of their lag (even if it's a sentinel), are to be considered caught-up.
   
   I also found the boolean expression with all the conditionals a little hard to read, so I pulled out some semantic methods.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -150,7 +147,7 @@ public void shouldAssignActiveStatefulTasksEvenlyOverClientsWhereNumberOfClients
             clientStates,
             allTaskIds,
             allTaskIds,
-            new AssignmentConfigs(0L, 0, 0, 0L)
+            new AssignmentConfigs(0L, 1, 0, 0L)

Review comment:
       All these tests erroneously set "max warmups" to zero.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -81,7 +81,7 @@
     );
 
     @Test
-    public void shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups() {
+    public void shouldBeStickyForActiveAndStandbyTasksWhileWarmingUp() {

Review comment:
       The diff is misaligned. I removed `shouldBeStickyForActiveAndStandbyTasksEvenIfNoWarmups` and added `shouldSkipWarmupsWhenAcceptableLagIsMax`. 

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -44,6 +44,9 @@ class StreamsTestBaseService(KafkaPathResolverMixin, JmxMixin, Service):
     CLEAN_NODE_ENABLED = True
 
     logs = {
+        "streams_config": {
+            "path": CONFIG_FILE,
+            "collect_default": True},

Review comment:
       It was handy to be able to see the used config file while debugging.

##########
File path: tests/kafkatest/services/streams.py
##########
@@ -465,6 +468,9 @@ def prop_file(self):
         properties['reduce.topic'] = self.REDUCE_TOPIC
         properties['join.topic'] = self.JOIN_TOPIC
 
+        # Long.MAX_VALUE lets us do the assignment without a warmup
+        properties['acceptable.recovery.lag'] = "9223372036854775807"
+

Review comment:
       Added this configuration to fix the flaky `StreamsOptimizedTest.test_upgrade_optimized_topology`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#issuecomment-632850480


   Previously, it would always fail for me within the first one to three tests, the first one more often than not.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r431349905



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+
+@Category(IntegrationTest.class)
+public class TaskAssignorIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, IllegalAccessException {
+        // This test uses reflection to check and make sure that all the expected configurations really
+        // make it all the way to configure the task assignor. There's no other use case for being able
+        // to extract all these fields, so reflection is a good choice until we find that the maintenance
+        // burden is too high.
+        //
+        // Also note that this is an integration test because so many components have to come together to
+        // ensure these configurations wind up where they belong, and any number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input");
+
+        // Maybe I'm paranoid, but I don't want the compiler deciding that my lambdas are equal to the identity
+        // function and defeating my identity check
+        final AtomicInteger compilerDefeatingReference = new AtomicInteger(0);
+
+        // the implementation doesn't matter, we're just going to verify the reference.
+        final AssignorConfiguration.AssignmentListener configuredAssignmentListener =
+            stable -> compilerDefeatingReference.incrementAndGet();
+
+        final Properties properties = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"),
+                mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
+                mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
+                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),

Review comment:
       Sounds good. Thanks for the fix!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429490483



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       Can we add a note to `AssignorConfiguration` pointing to this for any new assignor-related configs that get added?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429491985



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       Do we automatically pass through the internal configs? I notice we don't copy over the task assignor class, or the new assignment listener callback I added for the integration tests. But both of them seem to get through

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThrows;
+
+public class AssignorConfigurationTest {

Review comment:
       Should we move the handful of AssignorConfiguration related tests from StreamsPartitionAssignorTest to here?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();

Review comment:
       Fair enough. I don't think it was meant as a cost saving thing, just to make it easier to understand when something did or did not have caught-up clients. If you find this logic easier to follow, go for it

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();
             for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
                 final UUID client = clientEntry.getKey();
                 final long taskLag = clientEntry.getValue().lagFor(task);
-                if (taskLag == Task.LATEST_OFFSET || taskLag <= acceptableRecoveryLag) {
-                    taskToCaughtUpClients.computeIfAbsent(task, ignored -> new TreeSet<>()).add(client);
+                if (active(taskLag) || unbounded(acceptableRecoveryLag) || acceptable(acceptableRecoveryLag, taskLag)) {

Review comment:
       Nice catch! One nit is that "active" alone is not sufficient for being considered caught-up. Can we rename the `active` condition to `running` or `activeRunning`, etc?

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       I know it's deprecated, but I think the PartitionGrouper config is missing as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429496605



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));

Review comment:
       Trying to avoid piling on even more unrelated questions to the thread below, but there's another config that would need to be passed in, the admin client timeout.
   That said, can we just remove it? It only gets the timeout the admin is configured with anyway




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r438441698



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       Thanks, @guozhangwang 
   
   @ableegoldman , it seems backwards to me also.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r431322682



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       Regarding KAFKA-10046, in current trunk we already have some logic that assumes the default partition grouper is always used, so I'd suggest we just bite the bullet and remove it in 2.6.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r431326390



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -874,7 +874,7 @@
         public static final String TIME = "__time__";
 
         // This is settable in the main Streams config, but it's a private API for testing
-        public static final String ASSIGNMENT_LISTENER = "__asignment.listener__";

Review comment:
       🤦‍♀️ 

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));

Review comment:
       I'll open a separate PR to remove the extra timeout config

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.sameInstance;
+
+@Category(IntegrationTest.class)
+public class TaskAssignorIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, IllegalAccessException {
+        // This test uses reflection to check and make sure that all the expected configurations really
+        // make it all the way to configure the task assignor. There's no other use case for being able
+        // to extract all these fields, so reflection is a good choice until we find that the maintenance
+        // burden is too high.
+        //
+        // Also note that this is an integration test because so many components have to come together to
+        // ensure these configurations wind up where they belong, and any number of future code changes
+        // could break this change.
+
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + testId;
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input");
+
+        // Maybe I'm paranoid, but I don't want the compiler deciding that my lambdas are equal to the identity
+        // function and defeating my identity check
+        final AtomicInteger compilerDefeatingReference = new AtomicInteger(0);
+
+        // the implementation doesn't matter, we're just going to verify the reference.
+        final AssignorConfiguration.AssignmentListener configuredAssignmentListener =
+            stable -> compilerDefeatingReference.incrementAndGet();
+
+        final Properties properties = mkObjectProperties(
+            mkMap(
+                mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
+                mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
+                mkEntry(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "5"),
+                mkEntry(StreamsConfig.ACCEPTABLE_RECOVERY_LAG_CONFIG, "6"),
+                mkEntry(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, "7"),
+                mkEntry(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, "480000"),
+                mkEntry(StreamsConfig.InternalConfig.ASSIGNMENT_LISTENER, configuredAssignmentListener),

Review comment:
       Should we also validate that the task assignor gets passed through? We could even pass in a custom assignor and use that to verify the correct assignor configs got passed through.
   
   Of course, reflection black magic-ry is just more fun 🙂 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429927490



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThrows;
+
+public class AssignorConfigurationTest {

Review comment:
       Yes, please!

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least one warmup replica");
+            }
+

Review comment:
       IMO, we should check the limits for all configs. However, I am not sure if we should check `probingRebalanceIntervalMs` to be `>= 60 * 1000L` (as we do in `StreamsConfig`) or just `>= 0. 

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();
             for (final Map.Entry<UUID, ClientState> clientEntry : clientStates.entrySet()) {
                 final UUID client = clientEntry.getKey();
                 final long taskLag = clientEntry.getValue().lagFor(task);
-                if (taskLag == Task.LATEST_OFFSET || taskLag <= acceptableRecoveryLag) {
-                    taskToCaughtUpClients.computeIfAbsent(task, ignored -> new TreeSet<>()).add(client);
+                if (active(taskLag) || unbounded(acceptableRecoveryLag) || acceptable(acceptableRecoveryLag, taskLag)) {

Review comment:
       Nice catch indeed! I agree with @ableegoldman about the renaming. I am in favour of `activeRunning` or `activeAndRunning`.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       req: Please add verifications to `StreamsConfigTest#consumerConfigMustContainStreamPartitionAssignorConfig()` 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r430778603



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       > we automatically pass through any config that isn't registered
   
   I have to say, this seems totally backwards to me. So basically we just happen to pass in any number of configs that we may or may not need, but will split out specific configs that we _do_ need unless explicitly told to include them? I understand the custom configs motivation, but then why not just pass through all the configs?
   
   What if I wanted to access the value of one of my registered Streams configs in my plugged-in component? I'd have to add the same value a second time, but with an unregistered config name. Huh?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -347,22 +348,28 @@ public AssignmentListener assignmentListener() {
         public final long probingRebalanceIntervalMs;

Review comment:
       I think this comment got lost in a discussion thread, but can we add a note to AssignorConfiguration pointing out that any Streams configs added here will need to be explicitly passed through? It seems like it's too easy to fall into this same trap again




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#issuecomment-634702382


   The test failures were the result of extra tests that were added on trunk after this branch. The branch builder does a merge with trunk before running the tests. I'm rebasing and fixing the tests.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r430668785



##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       Yes, we copy over the other internal configs in org.apache.kafka.streams.processor.internals.StreamThread#create
   
   I'll add your listener to the config test. I'm not sure about PartitionGrouper.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least one warmup replica");
+            }
+

Review comment:
       We could; to be fair, the config parser already checks the limits, so it's not necessary for production code.
   
   It only comes up when we manually create these internal objects for tests. I added this particular bound specifically because I had previously passed in what I thought was a dummy value, which turned out to be a misconfiguration that actually affected the behavior I was testing.
   
   Offhand, it doesn't seem like the same thing would happen with probingRebalanceIntervalMs, so it doesn't seem like the check here would have the same benefit; WDYT?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -241,16 +239,29 @@ private static void assignStatelessActiveTasks(final TreeMap<UUID, ClientState>
         final Map<TaskId, SortedSet<UUID>> taskToCaughtUpClients = new HashMap<>();
 
         for (final TaskId task : statefulTasks) {
-
+            final TreeSet<UUID> caughtUpClients = new TreeSet<>();

Review comment:
       Cool; thanks!

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));

Review comment:
       Ack, I'll take a look at it,.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));

Review comment:
       Ack, I'll take a look at it.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       The answer about your listener is that we automatically pass through any config that _isn't_ registered. The assumption is, I suppose, that users might want to pass through custom configs to their custom plugged-in components.
   
   The configs in this PR got filtered out because they are registered configs in the public API.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       Turns out PartitionGrouper does _not_ get copied over. I'll create a jira to track this, so that we don't have to get sidetracked in this PR with the question of what we should do about this deprecated config.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));
+        consumerProps.put(MAX_WARMUP_REPLICAS_CONFIG, getInt(MAX_WARMUP_REPLICAS_CONFIG));
+        consumerProps.put(PROBING_REBALANCE_INTERVAL_MS_CONFIG, getLong(PROBING_REBALANCE_INTERVAL_MS_CONFIG));

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-10046

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least one warmup replica");
+            }
+

Review comment:
       Nevermind; I refactored the class to use the same config validation for passed-in arguments.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.junit.Test;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.Assert.assertThrows;
+
+public class AssignorConfigurationTest {

Review comment:
       Ah, unfortunately, that test relies on mocking package-private fields from another package. I'll just leave it alone for now.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));

Review comment:
       It does wind up "surviving" into the config that we use in the assignor, but I'm not sure if it's on purpose or just lucky.
   
   I'll postpone any existential questions, and just add it to the regression test.

##########
File path: streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
##########
@@ -1148,6 +1148,9 @@ private void verifyMaxInFlightRequestPerConnection(final Object maxInFlightReque
         consumerProps.put(REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(APPLICATION_SERVER_CONFIG, getString(APPLICATION_SERVER_CONFIG));
         consumerProps.put(NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
+        consumerProps.put(ACCEPTABLE_RECOVERY_LAG_CONFIG, getLong(ACCEPTABLE_RECOVERY_LAG_CONFIG));

Review comment:
       Ah, it was also tested here: org.apache.kafka.streams.StreamsConfigTest#consumerConfigShouldContainAdminClientConfigsForRetriesAndRetryBackOffMsWithAdminPrefix

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -347,22 +348,28 @@ public AssignmentListener assignmentListener() {
         public final long probingRebalanceIntervalMs;

Review comment:
       Ah, right. Sure thing!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#issuecomment-632849774


   FWIW:
   ```
   ================================================================================
   SESSION REPORT (ALL TESTS)
   ducktape version: 0.7.7
   session_id:       2020-05-22--007
   run time:         29 minutes 9.205 seconds
   tests run:        10
   passed:           10
   failed:           0
   ignored:          0
   ================================================================================
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   2 minutes 32.957 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   2 minutes 31.512 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   2 minutes 56.126 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   2 minutes 55.289 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   2 minutes 52.812 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   3 minutes 4.072 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   3 minutes 3.525 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   3 minutes 4.718 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   3 minutes 3.213 seconds
   --------------------------------------------------------------------------------
   test_id:    kafkatest.tests.streams.streams_optimized_test.StreamsOptimizedTest.test_upgrade_optimized_topology
   status:     PASS
   run time:   3 minutes 4.576 seconds
   --------------------------------------------------------------------------------
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8716:
URL: https://github.com/apache/kafka/pull/8716


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429420017



##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -0,0 +1,94 @@
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.test.IntegrationTest;
+import org.hamcrest.Matchers;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import java.lang.reflect.Field;
+import java.util.List;
+import java.util.Properties;
+
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
+import static org.apache.kafka.common.utils.Utils.mkProperties;
+import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+
+@Category(IntegrationTest.class)
+public class TaskAssignorIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public TestName testName = new TestName();
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, IllegalAccessException {
+        // This test uses reflection to check and make sure that all the expected configurations really
+        // make it all the way to configure the task assignor. There's no other use case for being able
+        // to extract all these fields, so reflection is a good choice until we find that the maintenance
+        // burden is too high.
+        //
+        // Also note that this is an integration test because so many components have to come together to
+        // ensure these configurations wind up where they belong, and any number of future code changes
+        // could break this change.

Review comment:
       Added this test, and verified that it does indeed fail on trunk for the expected reason that the new configs were ignored and defaults were substituted instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8716: KAFKA-6145: KIP-441: Fix assignor config passthough

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8716:
URL: https://github.com/apache/kafka/pull/8716#discussion_r429352661



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
##########
@@ -359,6 +359,10 @@ private AssignmentConfigs(final StreamsConfig configs) {
                           final Integer maxWarmupReplicas,
                           final Integer numStandbyReplicas,
                           final Long probingRebalanceIntervalMs) {
+            if (maxWarmupReplicas < 1) {
+                throw new IllegalArgumentException("must configure at least one warmup replica");
+            }
+

Review comment:
       I added this constraint to mirror the constraint we already apply in StreamConfig. It's not critical, but I was disappointed that I had written a bunch of tests that included a technically invalid configuration.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org