You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/18 02:15:30 UTC

[GitHub] [kafka] ableegoldman opened a new pull request #8892: KAFKA-10068: verify assignment performance with large cluster

ableegoldman opened a new pull request #8892:
URL: https://github.com/apache/kafka/pull/8892


   Because we do some nontrivial things in the StreamsPartitionAssignor, and will likely want to verify the scalability of new assigns we may add in the future, I added this to StreamsPartitionAssignorTest (vs just testing the HATA).
   
   With 10 topics at 1,000 partitions each and 100 consumers, the tests complete within seconds.
   
   If you increase the number of partitions per topic by a factor of 10, the assignment takes about 1min (for all three task assignors). Of course, the test is actually doing two full assignments, so the total time per assignment is still reasonable


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492302522



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {

Review comment:
       Huh, I thought we fixed this a while ago. Why it it showing up in the diff right now?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Why remove this? Do we need to instantiate this class now? (I only see static members still).

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Ah, note the `private`. The purpose of this constructor is to make it uninstantiable. I.e., it's "self-documenting" that it should only be a container for static members.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r482678606



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {
                 LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" +
                              offsetSum + " on member " + uuid + ". This probably means the task is corrupted," +
                              " which in turn indicates that it will need to restore from scratch if it gets assigned." +
                              " The assignor will de-prioritize returning this task to this member in the hopes that" +
                              " some other member may be able to re-use its state.");
                 taskLagTotals.put(task, endOffsetSum);

Review comment:
       Noticed that we were logging this incorrectly if either of the negative sentinels came up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r482678606



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {
                 LOG.warn("Task " + task + " had endOffsetSum=" + endOffsetSum + " smaller than offsetSum=" +
                              offsetSum + " on member " + uuid + ". This probably means the task is corrupted," +
                              " which in turn indicates that it will need to restore from scratch if it gets assigned." +
                              " The assignor will de-prioritize returning this task to this member in the hopes that" +
                              " some other member may be able to re-use its state.");
                 taskLagTotals.put(task, endOffsetSum);

Review comment:
       Noticed that we were logging this incorrectly if either of the negative sentinels came up




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-692979445


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492447041



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Ah yeah ok. I'll put it back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492382332



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       > Do we need to instantiate this class now
   
   No...isn't that exactly the reason we don't need this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {

Review comment:
       I don't know about fixing this before, but I found this while debugging these tests and fixed it on the side in this PR. To be fair, it wasn't hurting anything since we happen to put the right thing in the `taskLagTotals` map, but the warning logged was definitely incorrect.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Ah yeah ok. I'll put it back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r487867246



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    private final Class<? extends TaskAssignor> taskAssignor;
+
+    @Parameterized.Parameters(name = "task assignor = {0}")
+    public static Collection<Object[]> parameters() {
+        return asList(
+            new Object[]{StickyTaskAssignor.class},
+            new Object[]{HighAvailabilityTaskAssignor.class},
+            new Object[]{FallbackPriorTaskAssignor.class}
+        );
+    }
+
+    public StreamsAssignmentScaleTest(final Class<? extends TaskAssignor> taskAssignor) {
+        this.taskAssignor = taskAssignor;
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargePartitionCount() {
+        shouldCompleteLargeAssignmentInReasonableTime(3_000, 1, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargeNumConsumers() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 1_000, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyStandbys() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 100, 1, 50);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyThreadsPerClient() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 10, 1000, 1);
+    }
+
+    private void shouldCompleteLargeAssignmentInReasonableTime(final int numPartitions,
+                                                               final int numClients,
+                                                               final int numThreadsPerClient,
+                                                               final int numStandbys) {
+        final List<String> topic = singletonList("topic");
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+        final List<PartitionInfo> partitionInfos = getPartitionInfos(numPartitions);

Review comment:
       nit: You could inline this call on line 122

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    private final Class<? extends TaskAssignor> taskAssignor;
+
+    @Parameterized.Parameters(name = "task assignor = {0}")
+    public static Collection<Object[]> parameters() {
+        return asList(
+            new Object[]{StickyTaskAssignor.class},
+            new Object[]{HighAvailabilityTaskAssignor.class},
+            new Object[]{FallbackPriorTaskAssignor.class}
+        );
+    }
+
+    public StreamsAssignmentScaleTest(final Class<? extends TaskAssignor> taskAssignor) {
+        this.taskAssignor = taskAssignor;
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargePartitionCount() {
+        shouldCompleteLargeAssignmentInReasonableTime(3_000, 1, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargeNumConsumers() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 1_000, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyStandbys() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 100, 1, 50);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyThreadsPerClient() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 10, 1000, 1);
+    }
+
+    private void shouldCompleteLargeAssignmentInReasonableTime(final int numPartitions,
+                                                               final int numClients,
+                                                               final int numThreadsPerClient,
+                                                               final int numStandbys) {
+        final List<String> topic = singletonList("topic");
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+        final List<PartitionInfo> partitionInfos = getPartitionInfos(numPartitions);
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andReturn(builder).anyTimes();
+        expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
+        expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
+        final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
+
+        final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+
+        final Map<String, Object> configMap = new HashMap<>();
+        configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
+        configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
+        configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
+        configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
+        configMap.put(InternalConfig.TIME, new MockTime());
+        configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
+        configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            new MockTime(),
+            new StreamsConfig(configMap),
+            new MockClientSupplier().restoreConsumer,
+            false
+        );
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient, mainConsumer);
+
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, Subscription> subscriptions = new HashMap<>();
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                subscriptions.put("consumer-" + client + "-" + i,

Review comment:
       Since you use string `"consumer"` also in the second assignment and it is important that the consumers have the same name, I would suggest to specify I constant for it. 

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {

Review comment:
       I do not remember exactly whether intergration tests are defined by running an embedded Kafka or tests that have a runtime beyond a certain threshold. If the latter, I guess, you should tag this class as an integration test.
   
   Also, I run the test on my machine locally and I got two failures due to time outs for `FallbackPriorTaskAssignor`, namely for `testManyStandbys()` and `testLargePartitionCount()`. Admittedly, at that time my machine was under high load, but I guess that could also happen on Jenkins. Should we use lower parameter values? 

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    private final Class<? extends TaskAssignor> taskAssignor;
+
+    @Parameterized.Parameters(name = "task assignor = {0}")
+    public static Collection<Object[]> parameters() {
+        return asList(
+            new Object[]{StickyTaskAssignor.class},
+            new Object[]{HighAvailabilityTaskAssignor.class},
+            new Object[]{FallbackPriorTaskAssignor.class}
+        );
+    }
+
+    public StreamsAssignmentScaleTest(final Class<? extends TaskAssignor> taskAssignor) {
+        this.taskAssignor = taskAssignor;
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargePartitionCount() {
+        shouldCompleteLargeAssignmentInReasonableTime(3_000, 1, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargeNumConsumers() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 1_000, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyStandbys() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 100, 1, 50);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyThreadsPerClient() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 10, 1000, 1);
+    }
+
+    private void shouldCompleteLargeAssignmentInReasonableTime(final int numPartitions,
+                                                               final int numClients,
+                                                               final int numThreadsPerClient,
+                                                               final int numStandbys) {
+        final List<String> topic = singletonList("topic");
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+        final List<PartitionInfo> partitionInfos = getPartitionInfos(numPartitions);
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andReturn(builder).anyTimes();
+        expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
+        expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
+        final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
+
+        final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+
+        final Map<String, Object> configMap = new HashMap<>();
+        configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
+        configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
+        configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
+        configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
+        configMap.put(InternalConfig.TIME, new MockTime());
+        configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
+        configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            new MockTime(),
+            new StreamsConfig(configMap),
+            new MockClientSupplier().restoreConsumer,
+            false
+        );
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient, mainConsumer);
+
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, Subscription> subscriptions = new HashMap<>();
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                subscriptions.put("consumer-" + client + "-" + i,
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(client), EMPTY_TASKS, EMPTY_TASKS).encode())
+                );
+            }
+        }
+
+        final long firstAssignmentStartMs = System.currentTimeMillis();
+        final Map<String, Assignment> firstAssignments = partitionAssignor.assign(clusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final long firstAssignmentEndMs = System.currentTimeMillis();
+
+        final long firstAssignmentDuration = firstAssignmentEndMs - firstAssignmentStartMs;
+        if (firstAssignmentDuration > MAX_ASSIGNMENT_DURATION) {
+            throw new AssertionError("The first assignment took took too long to complete at " + firstAssignmentDuration + "ms.");

Review comment:
       I guess there is a `took` too too much. ;-)

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    private final Class<? extends TaskAssignor> taskAssignor;
+
+    @Parameterized.Parameters(name = "task assignor = {0}")
+    public static Collection<Object[]> parameters() {
+        return asList(
+            new Object[]{StickyTaskAssignor.class},
+            new Object[]{HighAvailabilityTaskAssignor.class},
+            new Object[]{FallbackPriorTaskAssignor.class}
+        );
+    }
+
+    public StreamsAssignmentScaleTest(final Class<? extends TaskAssignor> taskAssignor) {
+        this.taskAssignor = taskAssignor;
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargePartitionCount() {
+        shouldCompleteLargeAssignmentInReasonableTime(3_000, 1, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargeNumConsumers() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 1_000, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyStandbys() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 100, 1, 50);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyThreadsPerClient() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 10, 1000, 1);
+    }
+
+    private void shouldCompleteLargeAssignmentInReasonableTime(final int numPartitions,
+                                                               final int numClients,
+                                                               final int numThreadsPerClient,
+                                                               final int numStandbys) {
+        final List<String> topic = singletonList("topic");
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+        final List<PartitionInfo> partitionInfos = getPartitionInfos(numPartitions);
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andReturn(builder).anyTimes();
+        expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
+        expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
+        final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
+
+        final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+
+        final Map<String, Object> configMap = new HashMap<>();
+        configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
+        configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
+        configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
+        configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
+        configMap.put(InternalConfig.TIME, new MockTime());
+        configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
+        configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            new MockTime(),
+            new StreamsConfig(configMap),
+            new MockClientSupplier().restoreConsumer,
+            false
+        );
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient, mainConsumer);
+
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, Subscription> subscriptions = new HashMap<>();
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                subscriptions.put("consumer-" + client + "-" + i,
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(client), EMPTY_TASKS, EMPTY_TASKS).encode())
+                );
+            }
+        }
+
+        final long firstAssignmentStartMs = System.currentTimeMillis();
+        final Map<String, Assignment> firstAssignments = partitionAssignor.assign(clusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final long firstAssignmentEndMs = System.currentTimeMillis();
+
+        final long firstAssignmentDuration = firstAssignmentEndMs - firstAssignmentStartMs;
+        if (firstAssignmentDuration > MAX_ASSIGNMENT_DURATION) {
+            throw new AssertionError("The first assignment took took too long to complete at " + firstAssignmentDuration + "ms.");
+        } else {
+            log.info("First assignment took {}ms.", firstAssignmentDuration);
+        }
+
+        // Use the assignment to generate the subscriptions' prev task data for the next rebalance
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                final String consumer = "consumer-" + client + "-" + i;
+                final Assignment assignment = firstAssignments.get(consumer);
+                final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+                subscriptions.put(consumer,
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(i), new HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),
+                                      assignment.partitions())
+                );
+            }
+        }
+
+        final long secondAssignmentStartMs = System.currentTimeMillis();
+        final Map<String, Assignment> secondAssignments = partitionAssignor.assign(clusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final long secondAssignmentEndMs = System.currentTimeMillis();
+        final long secondAssignmentDuration = secondAssignmentEndMs - secondAssignmentStartMs;
+        if (secondAssignmentDuration > MAX_ASSIGNMENT_DURATION) {
+            throw new AssertionError("The second assignment took took too long to complete at " + secondAssignmentDuration + "ms.");

Review comment:
       I guess here too too.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    private final Class<? extends TaskAssignor> taskAssignor;
+
+    @Parameterized.Parameters(name = "task assignor = {0}")
+    public static Collection<Object[]> parameters() {
+        return asList(
+            new Object[]{StickyTaskAssignor.class},
+            new Object[]{HighAvailabilityTaskAssignor.class},
+            new Object[]{FallbackPriorTaskAssignor.class}
+        );
+    }
+
+    public StreamsAssignmentScaleTest(final Class<? extends TaskAssignor> taskAssignor) {
+        this.taskAssignor = taskAssignor;
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargePartitionCount() {
+        shouldCompleteLargeAssignmentInReasonableTime(3_000, 1, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargeNumConsumers() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 1_000, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyStandbys() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 100, 1, 50);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyThreadsPerClient() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 10, 1000, 1);
+    }
+
+    private void shouldCompleteLargeAssignmentInReasonableTime(final int numPartitions,
+                                                               final int numClients,
+                                                               final int numThreadsPerClient,
+                                                               final int numStandbys) {
+        final List<String> topic = singletonList("topic");
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+        final List<PartitionInfo> partitionInfos = getPartitionInfos(numPartitions);
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andReturn(builder).anyTimes();

Review comment:
       Why is this not `.andStubReturn()` too? For consistency, I would also use `.andStubReturn()` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492383223



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {

Review comment:
       I don't know about fixing this before, but I found this while debugging these tests and fixed it on the side in this PR. To be fair, it wasn't hurting anything since we happen to put the right thing in the `taskLagTotals` map, but the warning logged was definitely incorrect.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492382332



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       > Do we need to instantiate this class now
   
   No...isn't that exactly the reason we don't need this?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-696752069


   Thanks, @ableegoldman !
   
   There was only one unrelated test failure:
   `Build / JDK 11 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-692166978


   Whoops, I forgot to undo the reversion of #9231 before pushing. It was giving me trouble compiling in my IDE so I just reverted it to unblock my testing for the time being


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r491087153



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({IntegrationTest.class})
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    /************ HighAvailabilityTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(6_000, 1, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 50, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    /************ StickyTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
+    }
+
+    /************ FallbackPriorTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    private void completeLargeAssignment(final int numPartitions,
+                                         final int numClients,
+                                         final int numThreadsPerClient,
+                                         final int numStandbys,
+                                         final Class<? extends TaskAssignor> taskAssignor) {
+        final List<String> topic = singletonList("topic");
+
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+
+        final List<PartitionInfo> partitionInfos = new ArrayList<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            partitionInfos.add(new PartitionInfo("topic", p, Node.noNode(), new Node[0], new Node[0]));
+        }
+
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andStubReturn(builder);
+        expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
+        expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
+        final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
+
+        final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+
+        final Map<String, Object> configMap = new HashMap<>();
+        configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
+        configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
+        configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
+        configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
+        configMap.put(InternalConfig.TIME, new MockTime());
+        configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
+        configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            new MockTime(),
+            new StreamsConfig(configMap),
+            new MockClientSupplier().restoreConsumer,
+            false
+        );
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient, mainConsumer);
+
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, Subscription> subscriptions = new HashMap<>();
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                subscriptions.put(getConsumerName(i, client),
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(client), EMPTY_TASKS, EMPTY_TASKS).encode())
+                );
+            }
+        }
+
+        final long firstAssignmentStartMs = System.currentTimeMillis();
+        final Map<String, Assignment> firstAssignments = partitionAssignor.assign(clusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final long firstAssignmentEndMs = System.currentTimeMillis();
+
+        final long firstAssignmentDuration = firstAssignmentEndMs - firstAssignmentStartMs;
+        if (firstAssignmentDuration > MAX_ASSIGNMENT_DURATION) {
+            throw new AssertionError("The first assignment took too long to complete at " + firstAssignmentDuration + "ms.");
+        } else {
+            log.info("First assignment took {}ms.", firstAssignmentDuration);
+        }
+
+        // Use the assignment to generate the subscriptions' prev task data for the next rebalance
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                final String consumer = getConsumerName(i, client);
+                final Assignment assignment = firstAssignments.get(consumer);
+                final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+                subscriptions.put(consumer,
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(i), new HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),

Review comment:
       Whoops,  good catch thanks




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8892:
URL: https://github.com/apache/kafka/pull/8892


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei merged pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei merged pull request #8892:
URL: https://github.com/apache/kafka/pull/8892


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r490101377



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({IntegrationTest.class})
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    /************ HighAvailabilityTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(6_000, 1, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 50, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    /************ StickyTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
+    }
+
+    /************ FallbackPriorTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    private void completeLargeAssignment(final int numPartitions,
+                                         final int numClients,
+                                         final int numThreadsPerClient,
+                                         final int numStandbys,
+                                         final Class<? extends TaskAssignor> taskAssignor) {
+        final List<String> topic = singletonList("topic");
+
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+
+        final List<PartitionInfo> partitionInfos = new ArrayList<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            partitionInfos.add(new PartitionInfo("topic", p, Node.noNode(), new Node[0], new Node[0]));
+        }
+
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andStubReturn(builder);
+        expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
+        expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
+        final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
+
+        final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+
+        final Map<String, Object> configMap = new HashMap<>();
+        configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
+        configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
+        configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
+        configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
+        configMap.put(InternalConfig.TIME, new MockTime());
+        configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
+        configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            new MockTime(),
+            new StreamsConfig(configMap),
+            new MockClientSupplier().restoreConsumer,
+            false
+        );
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient, mainConsumer);
+
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, Subscription> subscriptions = new HashMap<>();
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                subscriptions.put(getConsumerName(i, client),
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(client), EMPTY_TASKS, EMPTY_TASKS).encode())
+                );

Review comment:
       Code style suggestion:
   ```suggestion
                   subscriptions.put(
                       getConsumerName(i, client),
                       new Subscription(topic, getInfo(uuidForInt(client), EMPTY_TASKS, EMPTY_TASKS).encode())
                   );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({IntegrationTest.class})
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    /************ HighAvailabilityTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(6_000, 1, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 50, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    /************ StickyTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
+    }
+
+    /************ FallbackPriorTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    private void completeLargeAssignment(final int numPartitions,
+                                         final int numClients,
+                                         final int numThreadsPerClient,
+                                         final int numStandbys,
+                                         final Class<? extends TaskAssignor> taskAssignor) {
+        final List<String> topic = singletonList("topic");
+
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+
+        final List<PartitionInfo> partitionInfos = new ArrayList<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            partitionInfos.add(new PartitionInfo("topic", p, Node.noNode(), new Node[0], new Node[0]));
+        }
+
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andStubReturn(builder);
+        expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
+        expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
+        final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
+
+        final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+
+        final Map<String, Object> configMap = new HashMap<>();
+        configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
+        configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
+        configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
+        configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
+        configMap.put(InternalConfig.TIME, new MockTime());
+        configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
+        configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            new MockTime(),
+            new StreamsConfig(configMap),
+            new MockClientSupplier().restoreConsumer,
+            false
+        );
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient, mainConsumer);
+
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, Subscription> subscriptions = new HashMap<>();
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                subscriptions.put(getConsumerName(i, client),
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(client), EMPTY_TASKS, EMPTY_TASKS).encode())
+                );
+            }
+        }
+
+        final long firstAssignmentStartMs = System.currentTimeMillis();
+        final Map<String, Assignment> firstAssignments = partitionAssignor.assign(clusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final long firstAssignmentEndMs = System.currentTimeMillis();
+
+        final long firstAssignmentDuration = firstAssignmentEndMs - firstAssignmentStartMs;
+        if (firstAssignmentDuration > MAX_ASSIGNMENT_DURATION) {
+            throw new AssertionError("The first assignment took too long to complete at " + firstAssignmentDuration + "ms.");
+        } else {
+            log.info("First assignment took {}ms.", firstAssignmentDuration);
+        }
+
+        // Use the assignment to generate the subscriptions' prev task data for the next rebalance
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                final String consumer = getConsumerName(i, client);
+                final Assignment assignment = firstAssignments.get(consumer);
+                final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+                subscriptions.put(consumer,
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(i), new HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),
+                                      assignment.partitions())
+                );

Review comment:
       Code style suggestion:
   ```suggestion
                   subscriptions.put(
                       consumer,
                       new Subscription(
                           topic, 
                           getInfo(uuidForInt(i), new HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(), 
                           assignment.partitions()
                       )
                   );
   ```

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({IntegrationTest.class})
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    /************ HighAvailabilityTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(6_000, 1, 1, 1, HighAvailabilityTaskAssignor.class);

Review comment:
       I get this warning when I run this test:
   
   ```
   WARN Unable to assign 1 of 1 standby tasks for task [0_0]. There is not enough available capacity ...
   ```
   
   Since to get this warning, the code needs to break the loop in `assignStandbyReplicaTasks()`, I think you should either:
   1. set the stand-bys to 0, or
   2. set the clients to 2 
   
   Option 1 would not run the code in the loop at all. Option 2 would run all code in the loop until all stand-bys are assigned. I would prefer option 2, since option 2 runs more code in the test which is better for scale testing.
   
   Similar is true for the corresponding tests for the other assignors.
   
   IMPORTANT: Before you fix this, you should read my comment on line 234, otherwise setting the number of clients to 2 will not work as expected for the second assignment.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({IntegrationTest.class})
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    /************ HighAvailabilityTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(6_000, 1, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 50, HighAvailabilityTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testHighAvailabilityTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, HighAvailabilityTaskAssignor.class);
+    }
+
+    /************ StickyTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, StickyTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testStickyTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, StickyTaskAssignor.class);
+    }
+
+    /************ FallbackPriorTaskAssignor tests ************/
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargePartitionCount() {
+        completeLargeAssignment(2_000, 1, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorLargeNumConsumers() {
+        completeLargeAssignment(1_000, 1_000, 1, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyStandbys() {
+        completeLargeAssignment(1_000, 100, 1, 20, FallbackPriorTaskAssignor.class);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testFallbackPriorTaskAssignorManyThreadsPerClient() {
+        completeLargeAssignment(1_000, 10, 1000, 1, FallbackPriorTaskAssignor.class);
+    }
+
+    private void completeLargeAssignment(final int numPartitions,
+                                         final int numClients,
+                                         final int numThreadsPerClient,
+                                         final int numStandbys,
+                                         final Class<? extends TaskAssignor> taskAssignor) {
+        final List<String> topic = singletonList("topic");
+
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+
+        final List<PartitionInfo> partitionInfos = new ArrayList<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            partitionInfos.add(new PartitionInfo("topic", p, Node.noNode(), new Node[0], new Node[0]));
+        }
+
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andStubReturn(builder);
+        expect(taskManager.mainConsumer()).andStubReturn(mainConsumer);
+        expect(mainConsumer.committed(new HashSet<>())).andStubReturn(Collections.emptyMap());
+        final AdminClient adminClient = createMockAdminClientForAssignor(changelogEndOffsets);
+
+        final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+
+        final Map<String, Object> configMap = new HashMap<>();
+        configMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:8080");
+        configMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, EasyMock.createNiceMock(StreamsMetadataState.class));
+        configMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, new AtomicInteger());
+        configMap.put(InternalConfig.NEXT_SCHEDULED_REBALANCE_MS, new AtomicLong(Long.MAX_VALUE));
+        configMap.put(InternalConfig.TIME, new MockTime());
+        configMap.put(InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS, taskAssignor.getName());
+        configMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, numStandbys);
+
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            new MockTime(),
+            new StreamsConfig(configMap),
+            new MockClientSupplier().restoreConsumer,
+            false
+        );
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient, mainConsumer);
+
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+
+        final Map<String, Subscription> subscriptions = new HashMap<>();
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                subscriptions.put(getConsumerName(i, client),
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(client), EMPTY_TASKS, EMPTY_TASKS).encode())
+                );
+            }
+        }
+
+        final long firstAssignmentStartMs = System.currentTimeMillis();
+        final Map<String, Assignment> firstAssignments = partitionAssignor.assign(clusterMetadata, new GroupSubscription(subscriptions)).groupAssignment();
+        final long firstAssignmentEndMs = System.currentTimeMillis();
+
+        final long firstAssignmentDuration = firstAssignmentEndMs - firstAssignmentStartMs;
+        if (firstAssignmentDuration > MAX_ASSIGNMENT_DURATION) {
+            throw new AssertionError("The first assignment took too long to complete at " + firstAssignmentDuration + "ms.");
+        } else {
+            log.info("First assignment took {}ms.", firstAssignmentDuration);
+        }
+
+        // Use the assignment to generate the subscriptions' prev task data for the next rebalance
+        for (int client = 0; client < numClients; ++client) {
+            for (int i = 0; i < numThreadsPerClient; ++i) {
+                final String consumer = getConsumerName(i, client);
+                final Assignment assignment = firstAssignments.get(consumer);
+                final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+                subscriptions.put(consumer,
+                                  new Subscription(
+                                      topic,
+                                      getInfo(uuidForInt(i), new HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),

Review comment:
       This should be `uuidForInt(client)`, otherwise in the second assignment you have as many clients as stream threads and each client does only have one stream thread.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-696752069


   Thanks, @ableegoldman !
   
   There was only one unrelated test failure:
   `Build / JDK 11 / kafka.network.DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-646303241


   test this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492382332



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       > Do we need to instantiate this class now
   
   No...isn't that exactly the reason we don't need this?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {

Review comment:
       I don't know about fixing this before, but I found this while debugging these tests and fixed it on the side in this PR. To be fair, it wasn't hurting anything since we happen to put the right thing in the `taskLagTotals` map, but the warning logged was definitely incorrect.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Ah yeah ok. I'll put it back




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman removed a comment on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman removed a comment on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-692979445


   Retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r488236067



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {

Review comment:
       Thanks, the extra data points on the runtime are helpful. We should definitely give it a wide margin to avoid flakiness on Jenkins




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r488267594



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsAssignmentScaleTest.java
##########
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.createMockAdminClientForAssignor;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.getInfo;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.uuidForInt;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.FallbackPriorTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.StickyTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.TaskAssignor;
+import org.apache.kafka.test.MockApiProcessorSupplier;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.easymock.EasyMock;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@RunWith(value = Parameterized.class)
+public class StreamsAssignmentScaleTest {
+    final static long MAX_ASSIGNMENT_DURATION = 60 * 1000L; //each individual assignment should complete within 20s
+    final static String APPLICATION_ID = "streams-assignment-scale-test";
+
+    private final Logger log = LoggerFactory.getLogger(StreamsAssignmentScaleTest.class);
+
+    private final Class<? extends TaskAssignor> taskAssignor;
+
+    @Parameterized.Parameters(name = "task assignor = {0}")
+    public static Collection<Object[]> parameters() {
+        return asList(
+            new Object[]{StickyTaskAssignor.class},
+            new Object[]{HighAvailabilityTaskAssignor.class},
+            new Object[]{FallbackPriorTaskAssignor.class}
+        );
+    }
+
+    public StreamsAssignmentScaleTest(final Class<? extends TaskAssignor> taskAssignor) {
+        this.taskAssignor = taskAssignor;
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargePartitionCount() {
+        shouldCompleteLargeAssignmentInReasonableTime(3_000, 1, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testLargeNumConsumers() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 1_000, 1, 1);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyStandbys() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 100, 1, 50);
+    }
+
+    @Test(timeout = 120 * 1000)
+    public void testManyThreadsPerClient() {
+        shouldCompleteLargeAssignmentInReasonableTime(1_000, 10, 1000, 1);
+    }
+
+    private void shouldCompleteLargeAssignmentInReasonableTime(final int numPartitions,
+                                                               final int numClients,
+                                                               final int numThreadsPerClient,
+                                                               final int numStandbys) {
+        final List<String> topic = singletonList("topic");
+        final Map<TopicPartition, Long> changelogEndOffsets = new HashMap<>();
+        for (int p = 0; p < numPartitions; ++p) {
+            changelogEndOffsets.put(new TopicPartition(APPLICATION_ID + "-store-changelog", p), 100_000L);
+        }
+        final List<PartitionInfo> partitionInfos = getPartitionInfos(numPartitions);
+        final Cluster clusterMetadata = new Cluster(
+            "cluster",
+            Collections.singletonList(Node.noNode()),
+            partitionInfos,
+            emptySet(),
+            emptySet()
+        );
+
+        final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+        builder.addSource(null, "source", null, null, null, "topic");
+        builder.addProcessor("processor", new MockApiProcessorSupplier<>(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+
+        final Consumer<byte[], byte[]> mainConsumer = EasyMock.createNiceMock(Consumer.class);
+        final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andReturn(builder).anyTimes();

Review comment:
       I copied this over from StreamsPartitionAssignorTest, I'll fix it over there too




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-692350073


   Alright I decided to un-parametrize the tests so we could run the different task assignors with different inputs instead of relaxing the tests to allow for the slowest assignor to pass. The HATA can handle significantly higher partition/standby count than the other two assignors, and since it's the default now we should aim to test it at reasonable scale.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492436680



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Ah, note the `private`. The purpose of this constructor is to make it uninstantiable. I.e., it's "self-documenting" that it should only be a container for static members.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663


   Alright this should finally be ready for a final pass and merge @vvcephei  @cadonna -- sorry for leaving this hanging for so long.
   
   After running a few different tests, it seems like the HATA assignor may actually be faster than the old StickyTaskAssignor in most cases. The HATA seems to scale slightly worse with increasing number of clients, but significantly better with partition count and num standbys
   
   The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 standby) took the HATA 20s for the full test, but only ~1-2s for the STA and FPTA. 
   
   The `testManyStandbys` with 100 clients,  1000 partitions, and 50 standbys took the HATA roughly 10s, and the STA/FPTA just under a full minute.
   
   The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on the first assignment alone (taking just over 1.5 minutes)
   
   Decided to reduce the number of partitions in the `testLargePartitionCount` test to 3,000 rather than increasing the timeout for all tests, as it's already pretty high. Maybe we can drop the STA sooner or later and then tighten it up


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492436680



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Ah, note the `private`. The purpose of this constructor is to make it uninstantiable. I.e., it's "self-documenting" that it should only be a container for static members.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r492302522



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -288,17 +288,17 @@ public void computeTaskLags(final UUID uuid, final Map<TaskId, Long> allTaskEndO
             final Long endOffsetSum = taskEntry.getValue();
             final Long offsetSum = taskOffsetSums.getOrDefault(task, 0L);
 
-            if (endOffsetSum < offsetSum) {
+            if (offsetSum == Task.LATEST_OFFSET) {
+                taskLagTotals.put(task, Task.LATEST_OFFSET);
+            } else if (offsetSum == UNKNOWN_OFFSET_SUM) {
+                taskLagTotals.put(task, UNKNOWN_OFFSET_SUM);
+            } else if (endOffsetSum < offsetSum) {

Review comment:
       Huh, I thought we fixed this a while ago. Why it it showing up in the diff right now?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentTestUtils.java
##########
@@ -77,13 +86,8 @@
     public static final TaskId TASK_2_3 = new TaskId(2, 3);
 
     public static final Set<TaskId> EMPTY_TASKS = emptySet();
-    public static final List<TaskId> EMPTY_TASK_LIST = emptyList();
-    public static final Map<TaskId, Long> EMPTY_TASK_OFFSET_SUMS = emptyMap();
     public static final Map<TopicPartition, Long> EMPTY_CHANGELOG_END_OFFSETS = new HashMap<>();
 
-    private AssignmentTestUtils() {}

Review comment:
       Why remove this? Do we need to instantiate this class now? (I only see static members still).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cadonna commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
cadonna commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r444067825



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1953,6 +1975,54 @@ public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         EasyMock.verify(consumerClient);
     }
 
+    @Test(timeout = 30 * 1000)
+    public void shouldCompleteLargeAssignmentInAReasonableAmountOfTime() {
+        builder.addSource(null, "source", null, null, null, TOPICS_LIST_XL.toArray(new String[0]));
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), EMPTY_TASKS, EMPTY_TASKS).encode())
+            );
+        }
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        createMockAdminClient(CHANGELOG_END_OFFSETS_XL);
+        configurePartitionAssignorWith(singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3));
+
+        final Map<String, Assignment> assignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Use the assignment to generate the subscriptions' prev task data for the next rebalance
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            final String consumer = "consumer-" + i;
+            final Assignment assignment = assignments.get(consumer);
+            final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), new HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),
+                                  assignment.partitions())
+            );
+        }
+
+        final Map<String, Assignment> secondAssignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new GroupSubscription(subscriptions)).groupAssignment();
+    }
+
+    private static List<PartitionInfo> getPartitionInfos(final int numTopics, final int numPartitionsPerTopic) {
+        final List<PartitionInfo> partitionInfos = new ArrayList<>();
+        for (int t = 1; t <= numTopics; ++t) { // topic numbering starts from 1
+            for (int p = 0; p < numPartitionsPerTopic; ++p) {
+                partitionInfos.add(new PartitionInfo("topic" + t, p, Node.noNode(), new Node[0], new Node[0]));
+            }
+        }
+        return  partitionInfos;
+    }

Review comment:
       prop: Could you please move this method closer to where it is used, i.e., around line 200? 

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1932,10 +1954,10 @@ public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         streamsBuilder.table("topic1", Materialized.as("store"));
 
         subscriptions.put("consumer10",
-            new Subscription(
-                singletonList("topic1"),
-                defaultSubscriptionInfo.encode()
-            ));
+                          new Subscription(
+                              singletonList("topic1"),
+                              defaultSubscriptionInfo.encode()
+                          ));

Review comment:
       prop: 
   
   ```
   subscriptions.put("consumer10", new Subscription(singletonList("topic1"), defaultSubscriptionInfo.encode()));
   
   ```
   

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1953,6 +1975,54 @@ public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         EasyMock.verify(consumerClient);
     }
 
+    @Test(timeout = 30 * 1000)
+    public void shouldCompleteLargeAssignmentInAReasonableAmountOfTime() {
+        builder.addSource(null, "source", null, null, null, TOPICS_LIST_XL.toArray(new String[0]));
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), EMPTY_TASKS, EMPTY_TASKS).encode())
+            );
+        }
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        createMockAdminClient(CHANGELOG_END_OFFSETS_XL);
+        configurePartitionAssignorWith(singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3));
+
+        final Map<String, Assignment> assignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Use the assignment to generate the subscriptions' prev task data for the next rebalance
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            final String consumer = "consumer-" + i;
+            final Assignment assignment = assignments.get(consumer);
+            final AssignmentInfo info = AssignmentInfo.decode(assignment.userData());
+
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), new HashSet<>(info.activeTasks()), info.standbyTasks().keySet()).encode(),
+                                  assignment.partitions())
+            );
+        }
+
+        final Map<String, Assignment> secondAssignments =

Review comment:
       req: Could you please remove variable `secondAssignments` since it is never used?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1953,6 +1975,54 @@ public void shouldRequestCommittedOffsetsForPreexistingSourceChangelogs() {
         EasyMock.verify(consumerClient);
     }
 
+    @Test(timeout = 30 * 1000)
+    public void shouldCompleteLargeAssignmentInAReasonableAmountOfTime() {
+        builder.addSource(null, "source", null, null, null, TOPICS_LIST_XL.toArray(new String[0]));
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addStateStore(new MockKeyValueStoreBuilder("store", false), "processor");
+
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {
+            subscriptions.put("consumer-" + i,
+                              new Subscription(
+                                  TOPICS_LIST_XL,
+                                  getInfo(uuidForInt(i), EMPTY_TASKS, EMPTY_TASKS).encode())
+            );
+        }
+        createMockTaskManager(EMPTY_TASKS, EMPTY_TASKS);
+        createMockAdminClient(CHANGELOG_END_OFFSETS_XL);
+        configurePartitionAssignorWith(singletonMap(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 3));
+
+        final Map<String, Assignment> assignments =
+            partitionAssignor.assign(CLUSTER_METADATA_XL, new GroupSubscription(subscriptions)).groupAssignment();
+
+        // Use the assignment to generate the subscriptions' prev task data for the next rebalance
+        for (int i = 0; i < NUM_CONSUMERS_XL; ++i) {

Review comment:
       Q: I am wondering if it would be better to extract the two assignments to two scale tests, so that when one fails we immediately know whether the startup assignment or the intermediate assignment is slow. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-692980127


   One unrelated known-flaky test failure: `PurgeRepartitionTopicIntegrationTest.shouldRestoreState`


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663


   Alright this should finally be ready for a final pass and merge @vvcephei  @cadonna -- sorry for leaving this hanging for so long.
   
   After running a few different tests, it seems like the HATA assignor may actually be faster than the old StickyTaskAssignor in most cases. The HATA seems to scale slightly worse with increasing number of clients, but significantly better with partition count and num standbys
   
   The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 standby) took the HATA 20s for the full test, but only ~1-2s for the STA and FPTA. 
   
   The `testManyStandbys` with 100 clients,  1000 partitions, and 50 standbys took the HATA roughly 10s, and the STA/FPTA just under a full minute.
   
   The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on the first assignment alone (taking just over 1.5 minutes)
   
   Decided to reduce the number of partitions in the `testLargePartitionCount` test to 3,000 rather than increasing the timeout for all tests, as it's already pretty high. Maybe we can drop the STA sooner or later and then tighten it up


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-691358663


   Alright this should finally be ready for a final pass and merge @vvcephei  @cadonna -- sorry for leaving this hanging for so long.
   
   After running a few different tests, it seems like the HATA assignor may actually be faster than the old StickyTaskAssignor in most cases. The HATA seems to scale slightly worse with increasing number of clients, but significantly better with partition count and num standbys
   
   The `testLargeNumConsumers` with 1,000 clients and 1,000 partitions (and 1 standby) took the HATA 20s for the full test, but only ~1-2s for the STA and FPTA. 
   
   The `testManyStandbys` with 100 clients,  1000 partitions, and 50 standbys took the HATA roughly 10s, and the STA/FPTA just under a full minute.
   
   The `testLargePartitionCount` with 1 client, 6,000 partitions, and 1 standby took the HATA under 1s. The STA and FPTA both ran out of time, surprisingly on the first assignment alone (taking just over 1.5 minutes)
   
   Decided to reduce the number of partitions in the `testLargePartitionCount` test to 3,000 rather than increasing the timeout for all tests, as it's already pretty high. Maybe we can drop the STA sooner or later and then tighten it up


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#issuecomment-645727513


   call for review @vvcephei @cadonna 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] vvcephei commented on a change in pull request #8892: KAFKA-10068: verify assignment performance with large cluster

Posted by GitBox <gi...@apache.org>.
vvcephei commented on a change in pull request #8892:
URL: https://github.com/apache/kafka/pull/8892#discussion_r442497609



##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -148,27 +150,35 @@
     private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
     private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
 
-    private final List<PartitionInfo> infos = asList(
-        new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS);
+    private final List<PartitionInfo> partitionInfos = getPartitionInfos(3, 3);
+    {
+        partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
+    }
 
     private final Cluster metadata = new Cluster(
         "cluster",
         Collections.singletonList(Node.noNode()),
-        infos,
+        partitionInfos,
         emptySet(),
-        emptySet());
+        emptySet()
+    );
+
+    /* Used by the scale test for large apps/clusters */
+    private static final int NUM_TOPICS_XL = 10;
+    private static final int NUM_PARTITIONS_PER_TOPIC_XL = 1_000;
+    private static final int NUM_CONSUMERS_XL = 100;
+    private static final List<String> TOPICS_LIST_XL = new ArrayList<>();
+    private static final Map<TopicPartition, Long> CHANGELOG_END_OFFSETS_XL = new HashMap<>();
+    private static final List<PartitionInfo> PARTITION_INFOS_XL = getPartitionInfos(NUM_TOPICS_XL, NUM_PARTITIONS_PER_TOPIC_XL);
+    private static final Cluster CLUSTER_METADATA_XL = new Cluster(
+        "cluster",
+        Collections.singletonList(Node.noNode()),
+        PARTITION_INFOS_XL,
+        emptySet(),
+        emptySet()
+    );

Review comment:
       If there's a whole set of constants only used by one test, one might wonder whether that test shouldn't just be in its own class...

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -148,27 +150,35 @@
     private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
     private final TopicPartition t3p3 = new TopicPartition("topic3", 3);
 
-    private final List<PartitionInfo> infos = asList(
-        new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
-        new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
-    );
-
-    private final SubscriptionInfo defaultSubscriptionInfo = getInfo(UUID_1, EMPTY_TASKS, EMPTY_TASKS);
+    private final List<PartitionInfo> partitionInfos = getPartitionInfos(3, 3);
+    {
+        partitionInfos.add(new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]));
+    }

Review comment:
       Can you just pass this as an argument to `getPartitionInfos` so that we can do all the initialization in the assignment instead of needing an initialization block? The fact that this field is used in another field initialization statement makes the initialization block kind of questionable, since you have to read the JVM spec to know if this block executes before or after the usage.
   
   Alternatively, maybe the prior code was actually better, because you can see exactly what data you're testing with, instead of having to go read another method to understand what `getPartitioninfos(3, 3)` might mean.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org