You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "lihaosky (via GitHub)" <gi...@apache.org> on 2023/06/14 00:52:53 UTC

[GitHub] [kafka] lihaosky opened a new pull request, #13851: check enable rack

lihaosky opened a new pull request, #13851:
URL: https://github.com/apache/kafka/pull/13851

   ## Description
   Initial implementation to check if rack aware assignment can be enabled
   
   ## Test
   TODO
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1253718618


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);

Review Comment:
   Seem `makeReady` is not clean for this case -- should we instead try to cleanup `makeReady`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1242950088


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -539,24 +578,22 @@ public Set<String> makeReady(final Map<String, InternalTopicConfig> topics) {
     }
 
     /**
-     * Try to get the number of partitions for the given topics; return the number of partitions for topics that already exists.
+     * Try to get the partition information for the given topics; return the partition info for topics that already exists.
      *
      * Topics that were not able to get its description will simply not be returned
      */
     // visible for testing
-    protected Map<String, Integer> getNumPartitions(final Set<String> topics,
-                                                    final Set<String> tempUnknownTopics) {
-        log.debug("Trying to check if topics {} have been created with expected number of partitions.", topics);
-
+    protected Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics,
+                                                                final Set<String> tempUnknownTopics) {

Review Comment:
   fix indention



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;

Review Comment:
   Not sure what the means. Can we find a more descriptive name? (Or add a comment?)
   
   Edit: after reading the code, I think I understand. Can we rename to `canEnableRackAwareAssignerForActiveTasks` ?
   
   Also: why are we caching this result? I understand that it's complex to evaluate (and as long as internal topics don't exist, expensive), but could the result not change in between and we need to re-evaluate it?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);

Review Comment:
   DEBUG?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+                  log.error("Failed to describe topic for {}", topicsToDescribe);
+                  return false;
+              }
+              for (final Map.Entry<String, List<TopicPartitionInfo>> entry : topicPartitionInfo.entrySet()) {
+                final List<TopicPartitionInfo> partitionInfos = entry.getValue();
+                for (final TopicPartitionInfo partitionInfo : partitionInfos) {
+                  int partition = partitionInfo.partition();
+                  final List<Node> replicas = partitionInfo.replicas();
+                  if (replicas == null || replicas.isEmpty()) {
+                      log.error("Replicas not exist for topic partition {}:{}", entry.getKey(), partition);
+                      return false;
+                  }
+                  final TopicPartition topicPartition = new TopicPartition(entry.getKey(), partition);
+                  for (final Node node : replicas) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+                }
+              }
+          } catch (Exception e) {
+              log.error("Failed to describe topics {}", topicsToDescribe);
+              return false;
+          }
+      }
+
+      return true;
+  }
+
+  private boolean validateClientRack() {
+      /*
+       * Check rack information is populated correctly in clients
+       * 1. RackId exist for all clients
+       * 2. Different consumerId for same process should have same rackId
+       */
+      for (final Map.Entry<UUID, Map<String, Optional<String>>> entry : processRacks.entrySet()) {
+          final UUID processId = entry.getKey();
+          KeyValue<String, String> previousRackInfo = null;
+          for (final Map.Entry<String, Optional<String>> rackEntry : entry.getValue().entrySet()) {
+              if (!rackEntry.getValue().isPresent()) {
+                  log.warn("RackId doesn't exist for process {} and consumer {}. Disable {}",
+                      processId, rackEntry.getKey(), getClass().getName());
+                  return false;
+              }
+              if (previousRackInfo == null) {
+                  previousRackInfo = KeyValue.pair(rackEntry.getKey(), rackEntry.getValue().get());
+              } else if (!previousRackInfo.value.equals(rackEntry.getValue().get())) {
+                  log.warn(

Review Comment:
   This condition should actual indicate a bug? Might be better to log at ERROR level, and also add a hint to file a ticket?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {

Review Comment:
   Do we need to test this, given that is should not be possible (ie, not possible by user to miss-configure but rather indicate a very weird bug on our side)?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);

Review Comment:
   Why do we log here? Given that we are throwing the exception, it should get logged when the exception is caught, and thus it seems we double log here unnecessarily?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);

Review Comment:
   Should be `final` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {

Review Comment:
   As above.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {

Review Comment:
   This is for input and repartition topics we read from, right?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed

Review Comment:
   Are you saying we want to use this assigner only if we get rackId for all partitions? Why not try to assign the topic partitions with rack-id in a rack aware manner, and the remaining without? Or would it make the overall code base too complex?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+                  log.error("Failed to describe topic for {}", topicsToDescribe);
+                  return false;
+              }
+              for (final Map.Entry<String, List<TopicPartitionInfo>> entry : topicPartitionInfo.entrySet()) {
+                final List<TopicPartitionInfo> partitionInfos = entry.getValue();
+                for (final TopicPartitionInfo partitionInfo : partitionInfos) {
+                  int partition = partitionInfo.partition();
+                  final List<Node> replicas = partitionInfo.replicas();
+                  if (replicas == null || replicas.isEmpty()) {
+                      log.error("Replicas not exist for topic partition {}:{}", entry.getKey(), partition);

Review Comment:
   Missing space after `:`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);
+                    throw new TimeoutException(timeoutError);
+                }
+                log.info(

Review Comment:
   Should this be DEBUG level?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Different rackId for same process
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer2", Optional.of("rack2"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // tp00 has rackInfo in cluster metadata
+        assertTrue(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithDescribingTopics() {
+        final PartitionInfo noNodeInfo = new PartitionInfo(TOPIC0, 0, null, new Node[0], new Node[0]);
+
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2, Node.noNode())), // mockClientSupplier.setCluster requires noNode
+            Collections.singleton(noNodeInfo),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager);
+        doReturn(
+            Collections.singletonMap(
+                TOPIC0,
+                Collections.singletonList(
+                    new TopicPartitionInfo(0, node0, Arrays.asList(replicas), Collections.emptyList())
+                )
+            )
+        ).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   Should we read from both "good" partitions?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());

Review Comment:
   Why are we removing here?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+                  log.error("Failed to describe topic for {}", topicsToDescribe);
+                  return false;
+              }
+              for (final Map.Entry<String, List<TopicPartitionInfo>> entry : topicPartitionInfo.entrySet()) {
+                final List<TopicPartitionInfo> partitionInfos = entry.getValue();
+                for (final TopicPartitionInfo partitionInfo : partitionInfos) {
+                  int partition = partitionInfo.partition();

Review Comment:
   Should this be `final`?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {

Review Comment:
   Is it possible that it's `null` or `0` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+                  log.error("Failed to describe topic for {}", topicsToDescribe);
+                  return false;
+              }
+              for (final Map.Entry<String, List<TopicPartitionInfo>> entry : topicPartitionInfo.entrySet()) {
+                final List<TopicPartitionInfo> partitionInfos = entry.getValue();
+                for (final TopicPartitionInfo partitionInfo : partitionInfos) {
+                  int partition = partitionInfo.partition();
+                  final List<Node> replicas = partitionInfo.replicas();
+                  if (replicas == null || replicas.isEmpty()) {
+                      log.error("Replicas not exist for topic partition {}:{}", entry.getKey(), partition);

Review Comment:
   ```
   Replicas [don't] exist
   ```
   
   Or better:
   ```
   No replicas found for topic partition
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());

Review Comment:
   Does this mean, `topicsToDescribe` will be all internal topics that do not exist yet?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());

Review Comment:
   The comment on top of this method say: `Make sure rackId exist for all TopicPartitions needed`. However, it seems it would be possible that we have a rackId only for some replicas of a partition?
   
   Should we add an `else { return false; }` ?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+                  log.error("Failed to describe topic for {}", topicsToDescribe);
+                  return false;
+              }
+              for (final Map.Entry<String, List<TopicPartitionInfo>> entry : topicPartitionInfo.entrySet()) {
+                final List<TopicPartitionInfo> partitionInfos = entry.getValue();
+                for (final TopicPartitionInfo partitionInfo : partitionInfos) {
+                  int partition = partitionInfo.partition();
+                  final List<Node> replicas = partitionInfo.replicas();
+                  if (replicas == null || replicas.isEmpty()) {
+                      log.error("Replicas not exist for topic partition {}:{}", entry.getKey(), partition);
+                      return false;
+                  }
+                  final TopicPartition topicPartition = new TopicPartition(entry.getKey(), partition);
+                  for (final Node node : replicas) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+                }
+              }
+          } catch (Exception e) {
+              log.error("Failed to describe topics {}", topicsToDescribe);

Review Comment:
   We should log the exception.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -305,6 +327,31 @@ public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
         );
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionIfGetPartitionInfoHasTopicDescriptionTimeout() {
+        mockAdminClient.timeoutNextRequest(1);
+
+        final InternalTopicManager internalTopicManager =
+                new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
+        try {
+            final Set<String> topic1set = new HashSet<>(Collections.singletonList(topic1));
+            internalTopicManager.getTopicPartitionInfo(topic1set, null);
+
+        } catch (final TimeoutException expected) {
+            assertEquals(TimeoutException.class, expected.getCause().getClass());
+        }
+
+        mockAdminClient.timeoutNextRequest(1);

Review Comment:
   Why are we testing this twice? Cannot spot a difference between first and second part of this test?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -279,6 +279,28 @@ public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                 + " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
+        setupTopicInMockAdminClient(topic1, Collections.emptyMap());
+        final MockTime time = new MockTime(
+            (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 15

Review Comment:
   Why `/ 15` ? Given timeout is set to 50ms, it might be ok to just set time to zero?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+                  log.error("Failed to describe topic for {}", topicsToDescribe);
+                  return false;
+              }
+              for (final Map.Entry<String, List<TopicPartitionInfo>> entry : topicPartitionInfo.entrySet()) {
+                final List<TopicPartitionInfo> partitionInfos = entry.getValue();
+                for (final TopicPartitionInfo partitionInfo : partitionInfos) {
+                  int partition = partitionInfo.partition();
+                  final List<Node> replicas = partitionInfo.replicas();
+                  if (replicas == null || replicas.isEmpty()) {
+                      log.error("Replicas not exist for topic partition {}:{}", entry.getKey(), partition);
+                      return false;
+                  }
+                  final TopicPartition topicPartition = new TopicPartition(entry.getKey(), partition);
+                  for (final Node node : replicas) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }

Review Comment:
   As above: should we add an `else { return false; }`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Different rackId for same process
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer2", Optional.of("rack2"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // tp00 has rackInfo in cluster metadata
+        assertTrue(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithDescribingTopics() {
+        final PartitionInfo noNodeInfo = new PartitionInfo(TOPIC0, 0, null, new Node[0], new Node[0]);
+
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2, Node.noNode())), // mockClientSupplier.setCluster requires noNode

Review Comment:
   Not sure what the comment means? Can you elaborate for my own education?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   Should we read from both "good" partitions?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*

Review Comment:
   Why this comment?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Different rackId for same process
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer2", Optional.of("rack2"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   Should we read from both "good" partitions?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),

Review Comment:
   Do we need to add `partitionInfo10` here, to make it easier to reason about the test?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Different rackId for same process
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer2", Optional.of("rack2"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   Should we read from both "good" partitions?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);

Review Comment:
   Given that we put both partitions into all three nodes, what is the purpose of using 3 nodes? Might be better to place the first partitions into two replicas (0 and 1), and the second partition into replicas (1 and 2)?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),

Review Comment:
   Should we read from all three partitions? Or only from `tp10`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Different rackId for same process
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer2", Optional.of("rack2"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // tp00 has rackInfo in cluster metadata
+        assertTrue(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithDescribingTopics() {
+        final PartitionInfo noNodeInfo = new PartitionInfo(TOPIC0, 0, null, new Node[0], new Node[0]);
+
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2, Node.noNode())), // mockClientSupplier.setCluster requires noNode
+            Collections.singleton(noNodeInfo),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager);
+        doReturn(
+            Collections.singletonMap(
+                TOPIC0,
+                Collections.singletonList(
+                    new TopicPartitionInfo(0, node0, Arrays.asList(replicas), Collections.emptyList())
+                )
+            )
+        ).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            spyTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        assertTrue(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithDescribingTopicsFailure() {
+        final PartitionInfo noNodeInfo = new PartitionInfo(TOPIC0, 0, null, new Node[0], new Node[0]);
+
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2, Node.noNode())), // mockClientSupplier.setCluster requires noNode
+            Collections.singleton(noNodeInfo),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        MockInternalTopicManager spyTopicManager = spy(mockInternalTopicManager);
+        doThrow(new TimeoutException("Timeout describing topic")).when(spyTopicManager).getTopicPartitionInfo(Collections.singleton(TOPIC0));
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   Should we read from both "good" partitions?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());

Review Comment:
   Can we verify that we return `false` because of `tp10` but not because of a bug regarding `tp00`?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);

Review Comment:
   Maybe call this "partitionWithoutInfo" to make it clear what its purpose is?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255322606


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -279,6 +279,28 @@ public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                 + " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
+        setupTopicInMockAdminClient(topic1, Collections.emptyMap());
+        final MockTime time = new MockTime(
+            (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 15

Review Comment:
   Sure. Change to 5



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255324708


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);

Review Comment:
   I feel this log is ok since it logs internal `retryTimeoutMs` here which makes it clear. Caller will have to find it out to log it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax merged pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax merged PR #13851:
URL: https://github.com/apache/kafka/pull/13851


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1253724512


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());

Review Comment:
   Not sure either -- I am slightly concerned about missing a bug if we cannot verify it properly.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());

Review Comment:
   Not sure either -- I am slightly concerned about missing/masking a bug if we cannot verify it properly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1253720655


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -279,6 +279,28 @@ public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                 + " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
+        setupTopicInMockAdminClient(topic1, Collections.emptyMap());
+        final MockTime time = new MockTime(
+            (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 15

Review Comment:
   Well yes, but why not just set auto-tick to 1 or 5 (seem unnecessarily "complex" to compute it instead of hard-code it)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1253721200


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {

Review Comment:
   Leave it up to you -- if we keep it, maybe add a comment about it (I often read tests to understand what the code is supposed to do, and it could through someone off doing the same).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255320753


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());

Review Comment:
   Make a few method public to test this



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1244450143


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java:
##########
@@ -409,6 +411,43 @@ private String getBrokerSideConfigValue(final Config brokerSideTopicConfig,
         return brokerSideConfigEntry.value();
     }
 
+    public Map<String, List<TopicPartitionInfo>> getTopicPartitionInfo(final Set<String> topics) {
+        log.debug("Starting to describe topics {} in partition assignor.", topics);
+
+        long currentWallClockMs = time.milliseconds();
+        final long deadlineMs = currentWallClockMs + retryTimeoutMs;
+
+        Set<String> topicsToDescribe = new HashSet<>(topics);
+        final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = new HashMap<>();
+
+        while (!topicsToDescribe.isEmpty()) {
+            final Map<String, List<TopicPartitionInfo>> existed = getTopicPartitionInfo(topicsToDescribe, null);
+            topicPartitionInfo.putAll(existed);
+            topicsToDescribe.removeAll(topicPartitionInfo.keySet());
+            if (!topicsToDescribe.isEmpty()) {
+                currentWallClockMs = time.milliseconds();
+
+                if (currentWallClockMs >= deadlineMs) {
+                    final String timeoutError = String.format(
+                        "Could not create topics within %d milliseconds. " +
+                            "This can happen if the Kafka cluster is temporarily not available.",
+                        retryTimeoutMs);
+                    log.error(timeoutError);
+                    throw new TimeoutException(timeoutError);
+                }
+                log.info(

Review Comment:
   Is this mimicking the `makeReady` function logs. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed

Review Comment:
   The reason I want to enforce all topic partition racks exist to enable the assignor is to avoid too much assignment changes if some racks appear or disappear later somehow.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);

Review Comment:
   Sure



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {

Review Comment:
   I'm ok deleting this. Was trying to get test coverage.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   This is just to simplify to one task



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;

Review Comment:
   What you said make sense. I was also on the fence about this.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {

Review Comment:
   Yes



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());

Review Comment:
   If an external topic doesn't have node information, we will also try to describe it



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {

Review Comment:
   Yes. It's 0 for repartition topics we create: https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RepartitionTopics.java#L99



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());

Review Comment:
   Good catch



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -305,6 +327,31 @@ public void shouldThrowTimeoutExceptionIfTopicExistsDuringSetup() {
         );
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionIfGetPartitionInfoHasTopicDescriptionTimeout() {
+        mockAdminClient.timeoutNextRequest(1);
+
+        final InternalTopicManager internalTopicManager =
+                new InternalTopicManager(time, mockAdminClient, new StreamsConfig(config));
+        try {
+            final Set<String> topic1set = new HashSet<>(Collections.singletonList(topic1));
+            internalTopicManager.getTopicPartitionInfo(topic1set, null);
+
+        } catch (final TimeoutException expected) {
+            assertEquals(TimeoutException.class, expected.getCause().getClass());
+        }
+
+        mockAdminClient.timeoutNextRequest(1);

Review Comment:
   This is actually testing `getTopicPartitionInfo(final Set<String> topics, final Set<String> tempUnknownTopics)`



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RackAwareTaskAssignor.java:
##########
@@ -0,0 +1,164 @@
+package org.apache.kafka.streams.processor.internals;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RackAwareTaskAssignor {
+    private static final Logger log = LoggerFactory.getLogger(RackAwareTaskAssignor.class);
+
+    private final Cluster fullMetadata;
+    private final Map<TaskId, Set<TopicPartition>> partitionsForTask;
+    private final Map<UUID, Map<String, Optional<String>>> processRacks;
+    private final AssignmentConfigs assignmentConfigs;
+    private final Map<TopicPartition, Set<String>> racksForPartition;
+    private final InternalTopicManager internalTopicManager;
+    private Boolean canEnableForActive;
+
+    public RackAwareTaskAssignor(final Cluster fullMetadata,
+                                 final Map<TaskId, Set<TopicPartition>> partitionsForTask,
+                                 final Map<Subtopology, Set<TaskId>> tasksForTopicGroup,
+                                 final Map<UUID, Map<String, Optional<String>>> processRacks,
+                                 final InternalTopicManager internalTopicManager,
+                                 final AssignmentConfigs assignmentConfigs) {
+        this.fullMetadata = fullMetadata;
+        this.partitionsForTask = partitionsForTask;
+        this.processRacks = processRacks;
+        this.internalTopicManager = internalTopicManager;
+        this.assignmentConfigs = assignmentConfigs;
+        this.racksForPartition = new HashMap<>();
+    }
+
+  public synchronized boolean canEnableForActive() {
+      if (canEnableForActive != null) {
+          return canEnableForActive;
+      }
+
+      /*
+      TODO: enable this after we add the config
+      if (StreamsConfig.RACK_AWARE_ASSSIGNMENT_STRATEGY_NONE.equals(assignmentConfigs.rackAwareAssignmentStrategy)) {
+          canEnableForActive = false;
+          return false;
+      }
+       */
+
+      if (!validateClientRack()) {
+          canEnableForActive = false;
+          return false;
+      }
+
+      canEnableForActive = validateTopicPartitionRack();
+      return canEnableForActive;
+  }
+
+  public boolean canEnableForStandby() {
+      // TODO
+      return false;
+  }
+
+  private boolean validateTopicPartitionRack() {
+      // Make sure rackId exist for all TopicPartitions needed
+      final Set<String> topicsToDescribe = new HashSet<>();
+      for (final Set<TopicPartition> topicPartitions : partitionsForTask.values()) {
+          for (TopicPartition topicPartition : topicPartitions) {
+              final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
+              if (partitionInfo == null) {
+                  log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
+                  return false;
+              }
+              final Node[] replica = partitionInfo.replicas();
+              if (replica == null || replica.length == 0) {
+                  topicsToDescribe.add(topicPartition.topic());
+                  continue;
+              }
+              for (final Node node : replica) {
+                  if (node.hasRack()) {
+                      racksForPartition.computeIfAbsent(topicPartition, k -> new HashSet<>()).add(node.rack());
+                  }
+              }
+          }
+      }
+
+      if (!topicsToDescribe.isEmpty()) {
+          log.info("Fetching PartitionInfo for topics {}", topicsToDescribe);
+          try {
+              final Map<String, List<TopicPartitionInfo>> topicPartitionInfo = internalTopicManager.getTopicPartitionInfo(topicsToDescribe);
+              if (topicsToDescribe.size() > topicPartitionInfo.size()) {
+                  topicsToDescribe.removeAll(topicPartitionInfo.keySet());

Review Comment:
   To get the ones which fail to be described. It's used again in log on next line.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopicManagerTest.java:
##########
@@ -279,6 +279,28 @@ public CreateTopicsResult createTopics(final Collection<NewTopic> newTopics,
                 + " You can change the replication.factor config or upgrade your brokers to version 2.4 or newer to avoid this error."));
     }
 
+    @Test
+    public void shouldThrowTimeoutExceptionInGetPartitionInfo() {
+        setupTopicInMockAdminClient(topic1, Collections.emptyMap());
+        final MockTime time = new MockTime(
+            (Integer) config.get(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG)) / 15

Review Comment:
   I think this is `autoTickMs`, if set to 0, time won't increase?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*

Review Comment:
   Might need later after more implementation...



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),

Review Comment:
   For simplicity, this is just 1 task.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since process1 doesn't have rackId
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackDiffersInSameProcess() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Different rackId for same process
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer2", Optional.of("rack2"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithoutDescribingTopics() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.computeIfAbsent(process0UUID , k -> new HashMap<>()).put("consumer1", Optional.of("rack1"));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // tp00 has rackInfo in cluster metadata
+        assertTrue(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void enableRackAwareAssignorForActiveWithDescribingTopics() {
+        final PartitionInfo noNodeInfo = new PartitionInfo(TOPIC0, 0, null, new Node[0], new Node[0]);
+
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2, Node.noNode())), // mockClientSupplier.setCluster requires noNode

Review Comment:
   https://github.com/apache/kafka/blob/trunk/streams/src/test/java/org/apache/kafka/test/MockClientSupplier.java#L56
   
   has `cluster.nodeById(-1)`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),

Review Comment:
   We don't need to. The test is mainly testing 
   `processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));`



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());

Review Comment:
   Ideally yes. But this might be hard to verify... May make `validateTopicPartitionRack` public and verify it's false? Or we have to extract
   ```
                   final PartitionInfo partitionInfo = fullMetadata.partition(topicPartition);
                   if (partitionInfo == null) {
                       log.error("TopicPartition {} doesn't exist in cluster", topicPartition);
                       return false;
                   }
   ```
   to a public function and verify that returns false?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] mjsax commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "mjsax (via GitHub)" <gi...@apache.org>.
mjsax commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1253723136


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   Not sure if I understand? We do have one task, but it could also read from more than one partitions?
   
   My comments are mainly about readability of the test code -- we test different but very similar scenarios, but we setup each test very differently. To me, it seem easier to have some invariant in the code, and say: basic setup is X for all test, and each case is changing a single (or minimum) thing (not multiple/unnecessary things) to describe a single test scenario.
   
   When I was reading the test code, I needed to re-read it multiple time until I understand what the test is actually doing because the setup change too many (and unnecessary) things at the same time, and it was unclear what changes are actually the required ones.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] lihaosky commented on a diff in pull request #13851: KAFKA-15022: [1/N] initial implementation of rack aware assignor

Posted by "lihaosky (via GitHub)" <gi...@apache.org>.
lihaosky commented on code in PR #13851:
URL: https://github.com/apache/kafka/pull/13851#discussion_r1255321572


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/RackAwareTaskAssignorTest.java:
##########
@@ -0,0 +1,264 @@
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.spy;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.UUID;
+import org.apache.kafka.clients.admin.MockAdminClient;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.StreamsConfig.InternalConfig;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.internals.RackAwareTaskAssignor;
+import org.apache.kafka.streams.processor.internals.TopologyMetadata.Subtopology;
+import org.apache.kafka.streams.processor.internals.assignment.AssignorConfiguration.AssignmentConfigs;
+import org.apache.kafka.test.MockClientSupplier;
+import org.apache.kafka.test.MockInternalTopicManager;
+import org.junit.Test;
+
+public class RackAwareTaskAssignorTest {
+    private final static String TOPIC0 = "topic0";
+    private final static String TOPIC1 = "topic1";
+    private static final String USER_END_POINT = "localhost:8080";
+    private static final String APPLICATION_ID = "stream-partition-assignor-test";
+
+    private final Node node0 = new Node(0, "node0", 1, "rack1");
+    private final Node node1 = new Node(1, "node1", 1, "rack2");
+    private final Node node2 = new Node(2, "node2", 1, "rack3");
+    private final Node[] replicas = new Node[] {node0, node1, node2};
+
+    private final PartitionInfo partitionInfo00 = new PartitionInfo(TOPIC0, 0, node0, replicas, replicas);
+    private final PartitionInfo partitionInfo01 = new PartitionInfo(TOPIC0, 1, node0, replicas, replicas);
+
+    private final TopicPartition tp00 = new TopicPartition(TOPIC0, 0);
+    private final TopicPartition tp01 = new TopicPartition(TOPIC0, 1);
+    private final TopicPartition tp10 = new TopicPartition(TOPIC1, 0);
+
+    private final UUID process0UUID  = UUID.randomUUID();
+    private final UUID process1UUID = UUID.randomUUID();
+
+    private final Subtopology subtopology1 = new Subtopology(1, "topology1");
+
+    private final MockTime time = new MockTime();
+    private final StreamsConfig streamsConfig = new StreamsConfig(configProps());
+    private final MockClientSupplier mockClientSupplier = new MockClientSupplier();
+
+    private final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(
+            time,
+            streamsConfig,
+            mockClientSupplier.restoreConsumer,
+            false
+    );
+
+    private Map<String, Object> configProps() {
+        final Map<String, Object> configurationMap = new HashMap<>();
+        configurationMap.put(StreamsConfig.APPLICATION_ID_CONFIG, APPLICATION_ID);
+        configurationMap.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, USER_END_POINT);
+
+        final ReferenceContainer referenceContainer = new ReferenceContainer();
+        /*
+        referenceContainer.mainConsumer = consumer;
+        referenceContainer.adminClient = adminClient;
+        referenceContainer.taskManager = taskManager;
+        referenceContainer.streamsMetadataState = streamsMetadataState;
+        referenceContainer.time = time;
+        */
+        configurationMap.put(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR, referenceContainer);
+        return configurationMap;
+    }
+
+    @Test
+    public void disableActiveSinceMissingClusterInfo() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.of("rack1")));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), new HashSet<>(Arrays.asList(tp00, tp10))),
+            Collections.singletonMap(subtopology1, Collections.singleton(new TaskId(1, 1))),
+            processRacks,
+            mockInternalTopicManager,
+            new AssignorConfiguration(streamsConfig.originals()).assignmentConfigs()
+        );
+
+        // False since tp10 is missing in cluster metadata
+        assertFalse(assignor.canEnableForActive());
+    }
+
+    @Test
+    public void disableActiveSinceRackMissingInClient() {
+        final Cluster metadata = new Cluster(
+            "cluster",
+            new HashSet<>(Arrays.asList(node0, node1, node2)),
+            new HashSet<>(Arrays.asList(partitionInfo00, partitionInfo01)),
+            Collections.emptySet(),
+            Collections.emptySet()
+        );
+
+        final Map<UUID, Map<String, Optional<String>>> processRacks = new HashMap<>();
+
+        // Missing rackId config in client
+        processRacks.put(process0UUID , Collections.singletonMap("consumer1", Optional.empty()));
+
+        final RackAwareTaskAssignor assignor = new RackAwareTaskAssignor(
+            metadata,
+            Collections.singletonMap(new TaskId(1, 1), Collections.singleton(tp00)),

Review Comment:
   Will extract common code to method to make it clear what the differences are



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org