You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/24 00:28:23 UTC

[GitHub] [kafka] vvcephei commented on a change in pull request #8541: KAFKA-6145: KIP-441: Add TaskAssignor class config

vvcephei commented on a change in pull request #8541:
URL: https://github.com/apache/kafka/pull/8541#discussion_r414188592



##########
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##########
@@ -1146,4 +1146,13 @@ private static byte checkRange(final byte i) {
             }
         };
     }
+
+    @SafeVarargs
+    public static <E> Set<E> union(final Supplier<Set<E>> constructor, final Set<E>... set) {

Review comment:
       I've been wanting this for a while, so I just decided to add it.

##########
File path: build.gradle
##########
@@ -236,8 +236,10 @@ subprojects {
     def logStreams = new HashMap<String, FileOutputStream>()
     beforeTest { TestDescriptor td ->
       def tid = testId(td)
+      // truncate the file name if it's too long
       def logFile = new File(
-          "${projectDir}/build/reports/testOutput/${tid}.test.stdout")
+              "${projectDir}/build/reports/testOutput/${tid.substring(0, Math.min(tid.size(),240))}.test.stdout"

Review comment:
       Necessary because the test name that JUnit generates for the parameterized StreamsPartitionAssignorTest is slightly too long. I have no way to shorten it because the thing that pushes it over is the fact that there are two package names in the parameterized method name, and there's no control over the format of the test name itself. So, I decided just to truncate the file name instead, which is almost certainly still unique for pretty much any test.

##########
File path: clients/src/test/java/org/apache/kafka/test/TestUtils.java
##########
@@ -361,9 +361,9 @@ public static void waitForCondition(final TestCondition testCondition, final lon
      * avoid transient failures due to slow or overloaded machines.
      */
     public static void waitForCondition(final TestCondition testCondition, final long maxWaitMs, Supplier<String> conditionDetailsSupplier) throws InterruptedException {
-        String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null;
-        String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : "";
         retryOnExceptionWithTimeout(maxWaitMs, () -> {
+            String conditionDetailsSupplied = conditionDetailsSupplier != null ? conditionDetailsSupplier.get() : null;
+            String conditionDetails = conditionDetailsSupplied != null ? conditionDetailsSupplied : "";

Review comment:
       This is pointless unless we evaluate it inside the lambda.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {
+            log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
+                         + "trigger another rebalance to retry.");
+            setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
+            taskAssignor = new PriorTaskAssignor();

Review comment:
       Just to clarify everyone's roles, I added a new assignor whose only behavior is to return all previously owned tasks, and then assign any unowned tasks.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -713,23 +713,18 @@ private boolean assignTasksToClients(final Set<String> allSourceTopics,
             allTasks, clientStates, numStandbyReplicas());
 
         final TaskAssignor taskAssignor;
-        if (highAvailabilityEnabled) {
-            if (lagComputationSuccessful) {
-                taskAssignor = new HighAvailabilityTaskAssignor(
-                    clientStates,
-                    allTasks,
-                    statefulTasks,
-                    assignmentConfigs);
-            } else {
-                log.info("Failed to fetch end offsets for changelogs, will return previous assignment to clients and "
-                             + "trigger another rebalance to retry.");
-                setAssignmentErrorCode(AssignorError.REBALANCE_NEEDED.code());
-                taskAssignor = new StickyTaskAssignor(clientStates, allTasks, statefulTasks, assignmentConfigs, true);
-            }
+        if (!lagComputationSuccessful) {

Review comment:
       This is a change. I decided that from the perspective of the TaskAssignor API, the lags are one of the inputs, so it doesn't make sense to invoke the assignor if the lags aren't present.
   
   This potentially harms assignors that don't care about the lag (like the StickyTaskAssignor), but it also seems like if we can't compute the lags, then we probably can't do lots of other stuff that Streams needs to do anyway, so maybe it's not the worst thing in the world to schedule a "retry" on the assignment, even if it's not strictly necessary.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -16,49 +16,50 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
-import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
-import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
-import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
-import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.util.Map;
-import java.util.Set;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentUtils.taskIsCaughtUpOnClientOrNoCaughtUpClientsExist;
+import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.buildClientRankingsByTask;
+import static org.apache.kafka.streams.processor.internals.assignment.RankedClient.tasksToCaughtUpClients;
+import static org.apache.kafka.streams.processor.internals.assignment.TaskMovement.assignTaskMovements;
 
 public class HighAvailabilityTaskAssignor implements TaskAssignor {
     private static final Logger log = LoggerFactory.getLogger(HighAvailabilityTaskAssignor.class);
 
-    private final Map<UUID, ClientState> clientStates;
-    private final Map<UUID, Integer> clientsToNumberOfThreads;
-    private final SortedSet<UUID> sortedClients;
+    private Map<UUID, ClientState> clientStates;

Review comment:
       All these fields have to be non-final now, because we're setting them in `assign` instead of the constructor.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/PriorTaskAssignor.java
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+public class PriorTaskAssignor implements TaskAssignor {
+    private final StickyTaskAssignor delegate;
+
+    public PriorTaskAssignor() {
+        delegate = new StickyTaskAssignor(true);

Review comment:
       The StickyTaskAssignor is capable of satisfying the PriorTaskAssignor's contract, so we can just delegate to it. The important thing is that we now have two separately defined contracts:
   1. return all previous tasks and assign the rest (PriodTaskAssignor)
   2. strike a balance between stickiness and balance (StickyTaskAssignor)
   
   The fact that the implementation is shared is an ... implementation detail.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
##########
@@ -86,6 +90,22 @@ private ClientState(final Set<TaskId> activeTasks,
         this.capacity = capacity;
     }
 
+    public ClientState(final Set<TaskId> previousActiveTasks,
+                       final Set<TaskId> previousStandbyTasks,
+                       final Map<TaskId, Long> taskLagTotals,
+                       final int capacity) {

Review comment:
       This constructor is currently only used in tests, but I'm planning a follow-on refactor that would actually use it from the StickyTaskAssignor as well.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -111,8 +115,9 @@
     private final String storeName = "store";
 
     private AtomicBoolean errorInjected;
-    private AtomicBoolean gcInjected;
-    private volatile boolean doGC = true;
+    private AtomicBoolean stallInjected;

Review comment:
       While debugging this test, I found the "garbage collection" nomenclature confusing (because it is possible to invoke the JVM GC, but that's not what we're doing here), so I transitioned to calling it a "stall", which was also a term used in the test comments.

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignor.java
##########
@@ -95,6 +94,11 @@ public boolean assign() {
 
         assignStatelessActiveTasks();
 
+        log.info("Decided on assignment: " +

Review comment:
       I found this log useful while debugging the integration tests. WDYT about keeping it?

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {

Review comment:
       I made a bunch of changes to this test, because it was pretty brittle with respect to changes in the HighAvailabilityTaskAssignor.
   
   Namely:
   * by mocking ClientState instead of constructing the desired state, we'd throw NPEs in the assignor that could never happen in production if the assignor is modified in a way that happens to need to query the ClientState a little differently than the tests anticipate (for example, the lags were not always mocked). Now, we just construct a ClientState for each client, representing the desired scenario and make assertions on the resulting assignment.
   * by relying heavily on shared mutable fields inserted into shared mutable collections to build a shared assignor, it was relatively hard to read the tests "cold" and figure out how they were configured. I minimized the shared fields, and eliminated the mutability, so now the tests are pretty much self-contained. This is a bit opinionated, but I feel like it's the right thing to do. It seemed like my non-trivial ramp-up period where I figured out how the tests were actually constructed would decay pretty rapidly, and if I have to come back to this test in a couple of months, I didn't want to have to start learning all over again.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -515,84 +520,114 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
         // the app is supposed to copy all 60 records into the output topic
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
         //
-        // a GC pause gets inject after 20 committed and 30 uncommitted records got received
-        // -> the GC pause only affects one thread and should trigger a rebalance
+        // a stall gets injected after 20 committed and 30 uncommitted records got received
+        // -> the stall only affects one thread and should trigger a rebalance
         // after rebalancing, we should read 40 committed records (even if 50 record got written)
         //
         // afterwards, the "stalling" thread resumes, and another rebalance should get triggered
         // we write the remaining 20 records and verify to read 60 result records
 
         try (
-            final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1, eosConfig);
-            final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1, eosConfig)
+            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig);
+            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig)
         ) {
             startKafkaStreamsAndWaitForRunningState(streams1, MAX_WAIT_TIME_MS);
             startKafkaStreamsAndWaitForRunningState(streams2, MAX_WAIT_TIME_MS);
 
-            final List<KeyValue<Long, Long>> committedDataBeforeGC = prepareData(0L, 10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeGC = prepareData(10L, 15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> committedDataBeforeStall = prepareData(0L, 10L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeStall = prepareData(10L, 15L, 0L, 1L);
 
-            final List<KeyValue<Long, Long>> dataBeforeGC = new ArrayList<>();
-            dataBeforeGC.addAll(committedDataBeforeGC);
-            dataBeforeGC.addAll(uncommittedDataBeforeGC);
+            final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>();
+            dataBeforeStall.addAll(committedDataBeforeStall);
+            dataBeforeStall.addAll(uncommittedDataBeforeStall);
 
             final List<KeyValue<Long, Long>> dataToTriggerFirstRebalance = prepareData(15L, 20L, 0L, 1L);
 
             final List<KeyValue<Long, Long>> dataAfterSecondRebalance = prepareData(20L, 30L, 0L, 1L);
 
-            writeInputData(committedDataBeforeGC);
+            writeInputData(committedDataBeforeStall);
 
             waitForCondition(
                 () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
                 "SteamsTasks did not request commit.");
 
-            writeInputData(uncommittedDataBeforeGC);
+            writeInputData(uncommittedDataBeforeStall);
 
-            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeGC.size(), null);
-            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeGC.size(), CONSUMER_GROUP_ID);
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
+            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
 
-            checkResultPerKey(committedRecords, committedDataBeforeGC);
-            checkResultPerKey(uncommittedRecords, dataBeforeGC);
+            checkResultPerKey(committedRecords, committedDataBeforeStall);
+            checkResultPerKey(uncommittedRecords, dataBeforeStall);
 
-            gcInjected.set(true);
+            LOG.info("Injecting Stall");
+            stallInjected.set(true);
             writeInputData(dataToTriggerFirstRebalance);
+            LOG.info("Input Data Written");
+            waitForCondition(
+                () -> stallingHost.get() != null,
+                MAX_WAIT_TIME_MS,
+                "Expected a host to start stalling"
+            );
+            final String observedStallingHost = stallingHost.get();
+            final KafkaStreams stallingInstance;
+            final KafkaStreams remainingInstance;
+            if ("streams1".equals(observedStallingHost)) {
+                stallingInstance = streams1;
+                remainingInstance = streams2;
+            } else if ("streams2".equals(observedStallingHost)) {
+                stallingInstance = streams2;
+                remainingInstance = streams1;
+            } else {
+                throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
+            }
 
+            // the stalling instance won't have an updated view, and it doesn't matter what it thinks
+            // the assignment is. We only really care that the remaining instance only sees one host
+            // that owns both partitions.
             waitForCondition(
-                () -> streams1.allMetadata().size() == 1
-                    && streams2.allMetadata().size() == 1
-                    && (streams1.allMetadata().iterator().next().topicPartitions().size() == 2
-                        || streams2.allMetadata().iterator().next().topicPartitions().size() == 2),
-                MAX_WAIT_TIME_MS, "Should have rebalanced.");
+                () -> stallingInstance.allMetadata().size() == 2
+                    && remainingInstance.allMetadata().size() == 1
+                    && remainingInstance.allMetadata().iterator().next().topicPartitions().size() == 2,

Review comment:
       The previous test was seemingly dependent on the non-stalling instance being the one to "win" and be present in the metadata map, which is why the metadatas for both instances were `1` before.
   
   Now, we're being a little more explicit, by actually finding out which instance is the stalled one. Then we can assert that the instance that _isn't_ stalled (the only one still in the group) doesn't see the stalled instance anymore (since it has dropped out), and that it is now assigned both partitions.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -515,84 +520,114 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
         // the app is supposed to copy all 60 records into the output topic
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
         //
-        // a GC pause gets inject after 20 committed and 30 uncommitted records got received
-        // -> the GC pause only affects one thread and should trigger a rebalance
+        // a stall gets injected after 20 committed and 30 uncommitted records got received
+        // -> the stall only affects one thread and should trigger a rebalance
         // after rebalancing, we should read 40 committed records (even if 50 record got written)
         //
         // afterwards, the "stalling" thread resumes, and another rebalance should get triggered
         // we write the remaining 20 records and verify to read 60 result records
 
         try (
-            final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1, eosConfig);
-            final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1, eosConfig)
+            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig);
+            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig)
         ) {
             startKafkaStreamsAndWaitForRunningState(streams1, MAX_WAIT_TIME_MS);
             startKafkaStreamsAndWaitForRunningState(streams2, MAX_WAIT_TIME_MS);
 
-            final List<KeyValue<Long, Long>> committedDataBeforeGC = prepareData(0L, 10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeGC = prepareData(10L, 15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> committedDataBeforeStall = prepareData(0L, 10L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeStall = prepareData(10L, 15L, 0L, 1L);
 
-            final List<KeyValue<Long, Long>> dataBeforeGC = new ArrayList<>();
-            dataBeforeGC.addAll(committedDataBeforeGC);
-            dataBeforeGC.addAll(uncommittedDataBeforeGC);
+            final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>();
+            dataBeforeStall.addAll(committedDataBeforeStall);
+            dataBeforeStall.addAll(uncommittedDataBeforeStall);
 
             final List<KeyValue<Long, Long>> dataToTriggerFirstRebalance = prepareData(15L, 20L, 0L, 1L);
 
             final List<KeyValue<Long, Long>> dataAfterSecondRebalance = prepareData(20L, 30L, 0L, 1L);
 
-            writeInputData(committedDataBeforeGC);
+            writeInputData(committedDataBeforeStall);
 
             waitForCondition(
                 () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
                 "SteamsTasks did not request commit.");
 
-            writeInputData(uncommittedDataBeforeGC);
+            writeInputData(uncommittedDataBeforeStall);
 
-            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeGC.size(), null);
-            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeGC.size(), CONSUMER_GROUP_ID);
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
+            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
 
-            checkResultPerKey(committedRecords, committedDataBeforeGC);
-            checkResultPerKey(uncommittedRecords, dataBeforeGC);
+            checkResultPerKey(committedRecords, committedDataBeforeStall);
+            checkResultPerKey(uncommittedRecords, dataBeforeStall);
 
-            gcInjected.set(true);
+            LOG.info("Injecting Stall");
+            stallInjected.set(true);
             writeInputData(dataToTriggerFirstRebalance);
+            LOG.info("Input Data Written");
+            waitForCondition(
+                () -> stallingHost.get() != null,
+                MAX_WAIT_TIME_MS,
+                "Expected a host to start stalling"
+            );
+            final String observedStallingHost = stallingHost.get();
+            final KafkaStreams stallingInstance;
+            final KafkaStreams remainingInstance;
+            if ("streams1".equals(observedStallingHost)) {
+                stallingInstance = streams1;
+                remainingInstance = streams2;
+            } else if ("streams2".equals(observedStallingHost)) {
+                stallingInstance = streams2;
+                remainingInstance = streams1;
+            } else {
+                throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
+            }
 
+            // the stalling instance won't have an updated view, and it doesn't matter what it thinks
+            // the assignment is. We only really care that the remaining instance only sees one host
+            // that owns both partitions.
             waitForCondition(
-                () -> streams1.allMetadata().size() == 1
-                    && streams2.allMetadata().size() == 1
-                    && (streams1.allMetadata().iterator().next().topicPartitions().size() == 2
-                        || streams2.allMetadata().iterator().next().topicPartitions().size() == 2),
-                MAX_WAIT_TIME_MS, "Should have rebalanced.");
+                () -> stallingInstance.allMetadata().size() == 2
+                    && remainingInstance.allMetadata().size() == 1
+                    && remainingInstance.allMetadata().iterator().next().topicPartitions().size() == 2,
+                MAX_WAIT_TIME_MS,
+                () -> "Should have rebalanced.\n" +
+                    "Streams1[" + streams1.allMetadata() + "]\n" +
+                    "Streams2[" + streams2.allMetadata() + "]");
 
             final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
-                uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size(),
+                uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size(),
                 CONSUMER_GROUP_ID);
 
             final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>();
-            expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeGC);
+            expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeStall);
             expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
 
             checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
 
-            doGC = false;
+            LOG.info("Releasing Stall");
+            doStall = false;
+            // Once the stalling host rejoins the group, we expect both instances to see both instances.
+            // It doesn't really matter what the assignment is, but we might as well also assert that they
+            // both see both partitions assigned exactly once
             waitForCondition(
-                () -> streams1.allMetadata().size() == 1
-                    && streams2.allMetadata().size() == 1
-                    && streams1.allMetadata().iterator().next().topicPartitions().size() == 1
-                    && streams2.allMetadata().iterator().next().topicPartitions().size() == 1,
+                () -> streams1.allMetadata().size() == 2
+                    && streams2.allMetadata().size() == 2

Review comment:
       Again, `2` was always the right answer, we were just accidentally overwriting one instance's metadata with the other.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
##########
@@ -1832,102 +1834,6 @@ public void shouldThrowIllegalStateExceptionIfAnyTopicsMissingFromChangelogEndOf
         assertThrows(IllegalStateException.class, () -> partitionAssignor.assign(metadata, new GroupSubscription(subscriptions)));
     }
 
-    @Test
-    public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTriggerRebalanceIfEndOffsetFetchFailsAndHighAvailabilityEnabled() {
-        if (highAvailabilityEnabled) {

Review comment:
       This is why I moved these methods to their own test, so that this class can focus on verifying behavior that is invariant with respect to the parameterized assignor.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -515,84 +520,114 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
         // the app is supposed to copy all 60 records into the output topic
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
         //
-        // a GC pause gets inject after 20 committed and 30 uncommitted records got received
-        // -> the GC pause only affects one thread and should trigger a rebalance
+        // a stall gets injected after 20 committed and 30 uncommitted records got received
+        // -> the stall only affects one thread and should trigger a rebalance
         // after rebalancing, we should read 40 committed records (even if 50 record got written)
         //
         // afterwards, the "stalling" thread resumes, and another rebalance should get triggered
         // we write the remaining 20 records and verify to read 60 result records
 
         try (
-            final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1, eosConfig);
-            final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1, eosConfig)
+            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig);
+            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig)

Review comment:
       I added an argument to the KafkaStreams builder to set the dummy "host name". Previously, it was always "dummy" even though we had two instances, which resulted in the metadata map only containing one entry, even though there were two nodes in the cluster. I'm not sure if this was a cause of flakiness (since it seems it would be non-deterministic), but it's definitely not _right_.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##########
@@ -515,84 +520,114 @@ public void shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances() th
         // the app is supposed to copy all 60 records into the output topic
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes
         //
-        // a GC pause gets inject after 20 committed and 30 uncommitted records got received
-        // -> the GC pause only affects one thread and should trigger a rebalance
+        // a stall gets injected after 20 committed and 30 uncommitted records got received
+        // -> the stall only affects one thread and should trigger a rebalance
         // after rebalancing, we should read 40 committed records (even if 50 record got written)
         //
         // afterwards, the "stalling" thread resumes, and another rebalance should get triggered
         // we write the remaining 20 records and verify to read 60 result records
 
         try (
-            final KafkaStreams streams1 = getKafkaStreams(false, "appDir1", 1, eosConfig);
-            final KafkaStreams streams2 = getKafkaStreams(false, "appDir2", 1, eosConfig)
+            final KafkaStreams streams1 = getKafkaStreams("streams1", false, "appDir1", 1, eosConfig);
+            final KafkaStreams streams2 = getKafkaStreams("streams2", false, "appDir2", 1, eosConfig)
         ) {
             startKafkaStreamsAndWaitForRunningState(streams1, MAX_WAIT_TIME_MS);
             startKafkaStreamsAndWaitForRunningState(streams2, MAX_WAIT_TIME_MS);
 
-            final List<KeyValue<Long, Long>> committedDataBeforeGC = prepareData(0L, 10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeGC = prepareData(10L, 15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> committedDataBeforeStall = prepareData(0L, 10L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeStall = prepareData(10L, 15L, 0L, 1L);
 
-            final List<KeyValue<Long, Long>> dataBeforeGC = new ArrayList<>();
-            dataBeforeGC.addAll(committedDataBeforeGC);
-            dataBeforeGC.addAll(uncommittedDataBeforeGC);
+            final List<KeyValue<Long, Long>> dataBeforeStall = new ArrayList<>();
+            dataBeforeStall.addAll(committedDataBeforeStall);
+            dataBeforeStall.addAll(uncommittedDataBeforeStall);
 
             final List<KeyValue<Long, Long>> dataToTriggerFirstRebalance = prepareData(15L, 20L, 0L, 1L);
 
             final List<KeyValue<Long, Long>> dataAfterSecondRebalance = prepareData(20L, 30L, 0L, 1L);
 
-            writeInputData(committedDataBeforeGC);
+            writeInputData(committedDataBeforeStall);
 
             waitForCondition(
                 () -> commitRequested.get() == 2, MAX_WAIT_TIME_MS,
                 "SteamsTasks did not request commit.");
 
-            writeInputData(uncommittedDataBeforeGC);
+            writeInputData(uncommittedDataBeforeStall);
 
-            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeGC.size(), null);
-            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeGC.size(), CONSUMER_GROUP_ID);
+            final List<KeyValue<Long, Long>> uncommittedRecords = readResult(dataBeforeStall.size(), null);
+            final List<KeyValue<Long, Long>> committedRecords = readResult(committedDataBeforeStall.size(), CONSUMER_GROUP_ID);
 
-            checkResultPerKey(committedRecords, committedDataBeforeGC);
-            checkResultPerKey(uncommittedRecords, dataBeforeGC);
+            checkResultPerKey(committedRecords, committedDataBeforeStall);
+            checkResultPerKey(uncommittedRecords, dataBeforeStall);
 
-            gcInjected.set(true);
+            LOG.info("Injecting Stall");
+            stallInjected.set(true);
             writeInputData(dataToTriggerFirstRebalance);
+            LOG.info("Input Data Written");
+            waitForCondition(
+                () -> stallingHost.get() != null,
+                MAX_WAIT_TIME_MS,
+                "Expected a host to start stalling"
+            );
+            final String observedStallingHost = stallingHost.get();
+            final KafkaStreams stallingInstance;
+            final KafkaStreams remainingInstance;
+            if ("streams1".equals(observedStallingHost)) {
+                stallingInstance = streams1;
+                remainingInstance = streams2;
+            } else if ("streams2".equals(observedStallingHost)) {
+                stallingInstance = streams2;
+                remainingInstance = streams1;
+            } else {
+                throw new IllegalArgumentException("unexpected host name: " + observedStallingHost);
+            }
 
+            // the stalling instance won't have an updated view, and it doesn't matter what it thinks
+            // the assignment is. We only really care that the remaining instance only sees one host
+            // that owns both partitions.
             waitForCondition(
-                () -> streams1.allMetadata().size() == 1
-                    && streams2.allMetadata().size() == 1
-                    && (streams1.allMetadata().iterator().next().topicPartitions().size() == 2
-                        || streams2.allMetadata().iterator().next().topicPartitions().size() == 2),
-                MAX_WAIT_TIME_MS, "Should have rebalanced.");
+                () -> stallingInstance.allMetadata().size() == 2
+                    && remainingInstance.allMetadata().size() == 1
+                    && remainingInstance.allMetadata().iterator().next().topicPartitions().size() == 2,
+                MAX_WAIT_TIME_MS,
+                () -> "Should have rebalanced.\n" +
+                    "Streams1[" + streams1.allMetadata() + "]\n" +
+                    "Streams2[" + streams2.allMetadata() + "]");
 
             final List<KeyValue<Long, Long>> committedRecordsAfterRebalance = readResult(
-                uncommittedDataBeforeGC.size() + dataToTriggerFirstRebalance.size(),
+                uncommittedDataBeforeStall.size() + dataToTriggerFirstRebalance.size(),
                 CONSUMER_GROUP_ID);
 
             final List<KeyValue<Long, Long>> expectedCommittedRecordsAfterRebalance = new ArrayList<>();
-            expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeGC);
+            expectedCommittedRecordsAfterRebalance.addAll(uncommittedDataBeforeStall);
             expectedCommittedRecordsAfterRebalance.addAll(dataToTriggerFirstRebalance);
 
             checkResultPerKey(committedRecordsAfterRebalance, expectedCommittedRecordsAfterRebalance);
 
-            doGC = false;
+            LOG.info("Releasing Stall");
+            doStall = false;
+            // Once the stalling host rejoins the group, we expect both instances to see both instances.
+            // It doesn't really matter what the assignment is, but we might as well also assert that they
+            // both see both partitions assigned exactly once
             waitForCondition(
-                () -> streams1.allMetadata().size() == 1
-                    && streams2.allMetadata().size() == 1
-                    && streams1.allMetadata().iterator().next().topicPartitions().size() == 1
-                    && streams2.allMetadata().iterator().next().topicPartitions().size() == 1,
+                () -> streams1.allMetadata().size() == 2
+                    && streams2.allMetadata().size() == 2
+                    && streams1.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2
+                    && streams2.allMetadata().stream().mapToLong(meta -> meta.topicPartitions().size()).sum() == 2,

Review comment:
       The prior expectation was dependent on the rebalance algorithm's behavior. Now, we relax it and just ensure that all the partitions are assigned.

##########
File path: streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java
##########
@@ -1,349 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import static org.apache.kafka.common.utils.Utils.mkSet;
-import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.startApplicationAndWaitUntilRunning;
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.IsEqual.equalTo;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreamsWrapper;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.LagInfo;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.processor.StateRestoreListener;
-import org.apache.kafka.streams.processor.internals.StreamThread;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({IntegrationTest.class})
-public class LagFetchIntegrationTest {

Review comment:
       I'm not sure if we should really delete this test. The restore test is fine, but the rebalance test fails 50% of the time. Here's the situation:
   1. The test relies on the _second_ instance being the one to get the standby replica, which just happens to be the behavior of the StickyTaskAssignor.
   2. Making this test agnostic to the assignor's choice is extremely difficult because different barriers and latches need to be passed into the "standby" instance than the "active" one.
   3. I looked into the `allLocalStorePartitionLags()` implementation, and it doesn't look like there's any way in which the rebalance lifecycle could affect users' ability to query the lags. I'm not sure if a prior version of the code made this more plausible.
   
   Really, the rationale to delete the test is (3). I figured that out because I was investigating the possibility of writing a unit test instead, to make sure you could query the lags in each phase of the rebalance, but I came up blank because there seems to be no relationship at all between lag computation and rebalancing.
   
   If we don't want to delete the test, then we could consider setting the TaskAssignor to the (new) PriorTaskAssignor, which would actually guarantee in a reliable way that the second instance gets the standby task.
   
   But before just "fixing" the test, I wanted to double-check that we really need this test at all.
   
   

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/HighAvailabilityStreamsPartitionAssignorTest.java
##########
@@ -0,0 +1,326 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.ListOffsetsResult.ListOffsetsResultInfo;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Assignment;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.GroupSubscription;
+import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorError;
+import org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor;
+import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.apache.kafka.test.MockKeyValueStoreBuilder;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.emptyMap;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_CHANGELOG_END_OFFSETS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_TASKS;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
+import static org.easymock.EasyMock.anyObject;
+import static org.easymock.EasyMock.expect;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class HighAvailabilityStreamsPartitionAssignorTest {
+
+    private final List<PartitionInfo> infos = asList(
+        new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]),
+        new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0])
+    );
+
+    private final Cluster metadata = new Cluster(
+        "cluster",
+        singletonList(Node.noNode()),
+        infos,
+        emptySet(),
+        emptySet());
+
+    private final StreamsPartitionAssignor partitionAssignor = new StreamsPartitionAssignor();
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private TaskManager taskManager;
+    private Admin adminClient;
+    private StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
+    private final StreamsMetadataState streamsMetadataState = EasyMock.createNiceMock(StreamsMetadataState.class);
+    private final Map<String, Subscription> subscriptions = new HashMap<>();
+
+    private final AtomicInteger assignmentError = new AtomicInteger();
+    private final AtomicLong nextProbingRebalanceMs = new AtomicLong(Long.MAX_VALUE);
+    private final MockTime time = new MockTime();
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+        configurationMap.put(InternalConfig.TASK_MANAGER_FOR_PARTITION_ASSIGNOR, taskManager);
+        configurationMap.put(InternalConfig.STREAMS_METADATA_STATE_FOR_PARTITION_ASSIGNOR, streamsMetadataState);
+        configurationMap.put(InternalConfig.STREAMS_ADMIN_CLIENT, adminClient);
+        configurationMap.put(InternalConfig.ASSIGNMENT_ERROR_CODE, assignmentError);
+        configurationMap.put(InternalConfig.NEXT_PROBING_REBALANCE_MS, nextProbingRebalanceMs);
+        configurationMap.put(InternalConfig.TIME, time);
+        configurationMap.put(AssignorConfiguration.INTERNAL_TASK_ASSIGNOR_CLASS, HighAvailabilityTaskAssignor.class.getName());
+        return configurationMap;
+    }
+
+    // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor
+    private void configureDefaultPartitionAssignor() {
+        configurePartitionAssignorWith(emptyMap());
+    }
+
+    // Make sure to complete setting up any mocks (such as TaskManager or AdminClient) before configuring the assignor
+    private void configurePartitionAssignorWith(final Map<String, Object> props) {
+        final Map<String, Object> configMap = configProps();
+        configMap.putAll(props);
+
+        streamsConfig = new StreamsConfig(configMap);
+        partitionAssignor.configure(configMap);
+        EasyMock.replay(taskManager, adminClient);
+
+        overwriteInternalTopicManagerWithMock();
+    }
+
+    // Useful for tests that don't care about the task offset sums
+    private void createMockTaskManager(final Set<TaskId> activeTasks) {
+        createMockTaskManager(getTaskOffsetSums(activeTasks));
+    }
+
+    private void createMockTaskManager(final Map<TaskId, Long> taskOffsetSums) {
+        taskManager = EasyMock.createNiceMock(TaskManager.class);
+        expect(taskManager.builder()).andReturn(builder).anyTimes();
+        expect(taskManager.getTaskOffsetSums()).andReturn(taskOffsetSums).anyTimes();
+        expect(taskManager.processId()).andReturn(UUID_1).anyTimes();
+        builder.setApplicationId(APPLICATION_ID);
+        builder.buildTopology();
+    }
+
+    // If you don't care about setting the end offsets for each specific topic partition, the helper method
+    // getTopicPartitionOffsetMap is useful for building this input map for all partitions
+    private void createMockAdminClient(final Map<TopicPartition, Long> changelogEndOffsets) {
+        adminClient = EasyMock.createMock(AdminClient.class);
+
+        final ListOffsetsResult result = EasyMock.createNiceMock(ListOffsetsResult.class);
+        final KafkaFutureImpl<Map<TopicPartition, ListOffsetsResultInfo>> allFuture = new KafkaFutureImpl<>();
+        allFuture.complete(changelogEndOffsets.entrySet().stream().collect(Collectors.toMap(
+            Entry::getKey,
+            t -> {
+                final ListOffsetsResultInfo info = EasyMock.createNiceMock(ListOffsetsResultInfo.class);
+                expect(info.offset()).andStubReturn(t.getValue());
+                EasyMock.replay(info);
+                return info;
+            }))
+        );
+
+        expect(adminClient.listOffsets(anyObject())).andStubReturn(result);
+        expect(result.all()).andReturn(allFuture);
+
+        EasyMock.replay(result);
+    }
+
+    private void overwriteInternalTopicManagerWithMock() {
+        final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamsConfig, mockClientSupplier.restoreConsumer);
+        partitionAssignor.setInternalTopicManager(mockInternalTopicManager);
+    }
+
+    @Before
+    public void setUp() {
+        createMockAdminClient(EMPTY_CHANGELOG_END_OFFSETS);
+    }
+
+
+    @Test
+    public void shouldReturnAllActiveTasksToPreviousOwnerRegardlessOfBalanceAndTriggerRebalanceIfEndOffsetFetchFailsAndHighAvailabilityEnabled() {

Review comment:
       I moved this and other tests from `StreamsPartitionAssignorTest` that had been guarded to only actually run when parameterized with "high availability"

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/HighAvailabilityTaskAssignorTest.java
##########
@@ -41,132 +54,107 @@
 import static org.easymock.EasyMock.replay;
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.MatcherAssert.assertThat;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import org.apache.kafka.streams.processor.TaskId;
-import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
-import org.easymock.EasyMock;
-import org.junit.Test;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.not;
 
 public class HighAvailabilityTaskAssignorTest {
-    private long acceptableRecoveryLag = 100L;
-    private int balanceFactor = 1;
-    private int maxWarmupReplicas = 2;
-    private int numStandbyReplicas = 0;
-    private long probingRebalanceInterval = 60 * 1000L;
-
-    private Map<UUID, ClientState> clientStates = new HashMap<>();
-    private Set<TaskId> allTasks = new HashSet<>();
-    private Set<TaskId> statefulTasks = new HashSet<>();
-
-    private ClientState client1;
-    private ClientState client2;
-    private ClientState client3;
-    
-    private HighAvailabilityTaskAssignor taskAssignor;
-
-    private void createTaskAssignor() {
-        final AssignmentConfigs configs = new AssignmentConfigs(
-            acceptableRecoveryLag,
-            balanceFactor,
-            maxWarmupReplicas,
-            numStandbyReplicas,
-            probingRebalanceInterval
-        );
-        taskAssignor = new HighAvailabilityTaskAssignor(
-            clientStates,
-            allTasks,
-            statefulTasks,
-            configs);
-    }
+    private final AssignmentConfigs configWithoutStandbys = new AssignmentConfigs(
+        /*acceptableRecoveryLag*/ 100L,
+        /*balanceFactor*/ 1,
+        /*maxWarmupReplicas*/ 2,
+        /*numStandbyReplicas*/ 0,
+        /*probingRebalanceIntervalMs*/ 60 * 1000L
+    );
+
+    private final AssignmentConfigs configWithStandbys = new AssignmentConfigs(
+        /*acceptableRecoveryLag*/ 100L,
+        /*balanceFactor*/ 1,
+        /*maxWarmupReplicas*/ 2,
+        /*numStandbyReplicas*/ 1,
+        /*probingRebalanceIntervalMs*/ 60 * 1000L
+    );
 
-    @Test
-    public void shouldDecidePreviousAssignmentIsInvalidIfThereAreUnassignedActiveTasks() {
-        client1 = EasyMock.createNiceMock(ClientState.class);
-        expect(client1.prevActiveTasks()).andReturn(singleton(TASK_0_0));
-        expect(client1.prevStandbyTasks()).andStubReturn(EMPTY_TASKS);
-        replay(client1);
-        allTasks =  mkSet(TASK_0_0, TASK_0_1);
-        clientStates = singletonMap(UUID_1, client1);
-        createTaskAssignor();
 
-        assertFalse(taskAssignor.previousAssignmentIsValid());
+    @Test
+    public void shouldComputeNewAssignmentIfThereAreUnassignedActiveTasks() {

Review comment:
       These first tests are really the only ones to change. They're still asserting the same basic fact, but `previousAssignmentIsValid` is now an internal method, so we instead make assertions about the black-box semantics of the assignor, instead of a specific "visible for testing" method.

##########
File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/PriorTaskAssignorTest.java
##########
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.streams.processor.TaskId;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_0;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.TASK_0_2;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_1;
+import static org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.UUID_2;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
+
+public class PriorTaskAssignorTest {
+
+    private final Map<UUID, ClientState> clients = new TreeMap<>();
+
+    @Test
+    public void shouldViolateBalanceToPreserveActiveTaskStickiness() {

Review comment:
       Since I added the new assignor, I added a regression test to verify its most important special function. Most of its validity is verified by the parameterized StreamsPartitionAssignorTest.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org