You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/01 08:54:18 UTC

[incubator-pinot] branch master updated: Add StrictReplicaGroupInstanceSelector (#6208)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 413b7cb  Add StrictReplicaGroupInstanceSelector (#6208)
413b7cb is described below

commit 413b7cb294a59628a2233c6c593295f817c33455
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 1 01:54:00 2020 -0700

    Add StrictReplicaGroupInstanceSelector (#6208)
    
    Recently we have introduced some features which require all the segments from the same partition to be processed by the same server. For example, upsert (#4261) feature requires all the segments for a partition loaded on a single server before starting serving queries to ensure result correctness. The Strict Replica-Group Routing is designed to meet this requirement.
    
    This PR is the routing side change of the Strict Replica-Group Routing, which is handled in the StrictReplicaGroupInstanceSelector class.
    The new algorithm relies on the ideal state of the table, so this PR added the ideal state to the routing interfaces.
---
 .../pinot/broker/routing/RoutingManager.java       |  90 ++--
 .../instanceselector/BaseInstanceSelector.java     |  87 ++--
 .../routing/instanceselector/InstanceSelector.java |  18 +-
 .../instanceselector/InstanceSelectorFactory.java  |  22 +-
 .../StrictReplicaGroupInstanceSelector.java        | 181 ++++++++
 .../SegmentLineageBasedSegmentPreSelector.java     |   3 +-
 .../SegmentPreSelector.java                        |   6 +-
 .../SegmentPreSelectorFactory.java                 |   2 +-
 .../segmentpruner/PartitionSegmentPruner.java      |   6 +-
 .../routing/segmentpruner/SegmentPruner.java       |  18 +-
 .../segmentselector/OfflineSegmentSelector.java    |   7 +-
 .../segmentselector/RealtimeSegmentSelector.java   |   7 +-
 .../routing/segmentselector/SegmentSelector.java   |  14 +-
 .../routing/timeboundary/TimeBoundaryManager.java  |  27 +-
 .../instanceselector/InstanceSelectorTest.java     | 473 ++++++++++++++++-----
 .../SegmentPreSelectorTest.java                    |   4 +-
 .../routing/segmentpruner/SegmentPrunerTest.java   |  11 +-
 .../segmentselector/SegmentSelectorTest.java       |  11 +-
 .../timeboundary/TimeBoundaryManagerTest.java      |  43 +-
 .../apache/pinot/core/util/TableConfigUtils.java   |   6 +-
 .../pinot/core/util/TableConfigUtilsTest.java      |  26 +-
 .../pinot/spi/config/table/RoutingConfig.java      |   4 +-
 .../upsert_meetupRsvp_realtime_table_config.json   |   2 +-
 23 files changed, 798 insertions(+), 270 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
index 940e40c..e80ca3d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/RoutingManager.java
@@ -35,15 +35,16 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.broker.broker.helix.ClusterChangeHandler;
 import org.apache.pinot.broker.routing.instanceselector.InstanceSelector;
 import org.apache.pinot.broker.routing.instanceselector.InstanceSelectorFactory;
+import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
+import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelectorFactory;
 import org.apache.pinot.broker.routing.segmentpruner.SegmentPruner;
 import org.apache.pinot.broker.routing.segmentpruner.SegmentPrunerFactory;
-import org.apache.pinot.broker.routing.segmentselector.SegmentPreSelector;
-import org.apache.pinot.broker.routing.segmentselector.SegmentPreSelectorFactory;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelector;
 import org.apache.pinot.broker.routing.segmentselector.SegmentSelectorFactory;
 import org.apache.pinot.broker.routing.timeboundary.TimeBoundaryInfo;
@@ -154,13 +155,13 @@ public class RoutingManager implements ClusterChangeHandler {
                   tableNameWithType);
               continue;
             }
-            Set<String> onlineSegments = getOnlineSegments(tableNameWithType);
-            if (onlineSegments == null) {
+            IdealState idealState = getIdealState(tableNameWithType);
+            if (idealState == null) {
               LOGGER
                   .warn("Failed to find ideal state for table: {}, skipping updating routing entry", tableNameWithType);
               continue;
             }
-            routingEntry.onExternalViewChange(externalView, onlineSegments);
+            routingEntry.onExternalViewChange(externalView, idealState);
           } catch (Exception e) {
             LOGGER
                 .error("Caught unexpected exception while updating routing entry on external view change for table: {}",
@@ -190,19 +191,12 @@ public class RoutingManager implements ClusterChangeHandler {
   }
 
   @Nullable
-  private Set<String> getOnlineSegments(String tableNameWithType) {
-    ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + tableNameWithType, null, AccessOption.PERSISTENT);
+  private IdealState getIdealState(String tableNameWithType) {
+    Stat stat = new Stat();
+    ZNRecord znRecord = _zkDataAccessor.get(_idealStatePathPrefix + tableNameWithType, stat, AccessOption.PERSISTENT);
     if (znRecord != null) {
-      Map<String, Map<String, String>> segmentAssignment = znRecord.getMapFields();
-      Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
-      for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
-        Map<String, String> instanceStateMap = entry.getValue();
-        if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap
-            .containsValue(SegmentStateModel.CONSUMING)) {
-          onlineSegments.add(entry.getKey());
-        }
-      }
-      return onlineSegments;
+      znRecord.setVersion(stat.getVersion());
+      return new IdealState(znRecord);
     } else {
       return null;
     }
@@ -212,9 +206,9 @@ public class RoutingManager implements ClusterChangeHandler {
     LOGGER.info("Processing instance config change");
     long startTimeMs = System.currentTimeMillis();
 
-    List<ZNRecord> instanceConfigZNRecords =
-        _zkDataAccessor.getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT,
-            CommonConstants.Helix.ZkClient.RETRY_COUNT, CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
+    List<ZNRecord> instanceConfigZNRecords = _zkDataAccessor
+        .getChildren(_instanceConfigsPath, null, AccessOption.PERSISTENT, CommonConstants.Helix.ZkClient.RETRY_COUNT,
+            CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
     long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis();
 
     // Calculate new enabled and disabled instances
@@ -284,6 +278,7 @@ public class RoutingManager implements ClusterChangeHandler {
     if ("true".equals(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS))) {
       return false;
     }
+    //noinspection RedundantIfStatement
     if ("true".equals(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED))) {
       return false;
     }
@@ -310,22 +305,22 @@ public class RoutingManager implements ClusterChangeHandler {
       externalViewVersion = externalView.getRecord().getVersion();
     }
 
-    Set<String> onlineSegments = getOnlineSegments(tableNameWithType);
-    Preconditions.checkState(onlineSegments != null, "Failed to find ideal state for table: %s", tableNameWithType);
+    IdealState idealState = getIdealState(tableNameWithType);
+    Preconditions.checkState(idealState != null, "Failed to find ideal state for table: %s", tableNameWithType);
 
-    Set<String> enabledInstances = _enabledServerInstanceMap.keySet();
+    Set<String> onlineSegments = getOnlineSegments(idealState);
 
     SegmentPreSelector segmentPreSelector =
         SegmentPreSelectorFactory.getSegmentPreSelector(tableConfig, _propertyStore);
     Set<String> preSelectedOnlineSegments = segmentPreSelector.preSelect(onlineSegments);
     SegmentSelector segmentSelector = SegmentSelectorFactory.getSegmentSelector(tableConfig);
-    segmentSelector.init(externalView, preSelectedOnlineSegments);
+    segmentSelector.init(externalView, idealState, preSelectedOnlineSegments);
     List<SegmentPruner> segmentPruners = SegmentPrunerFactory.getSegmentPruners(tableConfig, _propertyStore);
     for (SegmentPruner segmentPruner : segmentPruners) {
-      segmentPruner.init(externalView, preSelectedOnlineSegments);
+      segmentPruner.init(externalView, idealState, preSelectedOnlineSegments);
     }
     InstanceSelector instanceSelector = InstanceSelectorFactory.getInstanceSelector(tableConfig, _brokerMetrics);
-    instanceSelector.init(enabledInstances, externalView, preSelectedOnlineSegments);
+    instanceSelector.init(_enabledServerInstanceMap.keySet(), externalView, idealState, preSelectedOnlineSegments);
 
     // Add time boundary manager if both offline and real-time part exist for a hybrid table
     TimeBoundaryManager timeBoundaryManager = null;
@@ -336,7 +331,7 @@ public class RoutingManager implements ClusterChangeHandler {
       if (_routingEntryMap.containsKey(realtimeTableName)) {
         LOGGER.info("Adding time boundary manager for table: {}", tableNameWithType);
         timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
-        timeBoundaryManager.init(externalView, onlineSegments);
+        timeBoundaryManager.init(externalView, idealState, preSelectedOnlineSegments);
       }
     } else {
       // Current table is real-time
@@ -355,12 +350,18 @@ public class RoutingManager implements ClusterChangeHandler {
         if (offlineTableExternalView == null) {
           offlineTableExternalView = new ExternalView(offlineTableName);
         }
-        Set<String> offlineTableOnlineSegments = getOnlineSegments(offlineTableName);
-        Preconditions.checkState(offlineTableOnlineSegments != null, "Failed to find ideal state for table: %s",
-            offlineTableName);
+        IdealState offlineTableIdealState = getIdealState(offlineTableName);
+        Preconditions
+            .checkState(offlineTableIdealState != null, "Failed to find ideal state for table: %s", offlineTableName);
+        Set<String> offlineTableOnlineSegments = getOnlineSegments(offlineTableIdealState);
+        SegmentPreSelector offlineTableSegmentPreSelector =
+            SegmentPreSelectorFactory.getSegmentPreSelector(offlineTableConfig, _propertyStore);
+        Set<String> offlineTablePreSelectedOnlineSegments =
+            offlineTableSegmentPreSelector.preSelect(offlineTableOnlineSegments);
         TimeBoundaryManager offlineTableTimeBoundaryManager =
             new TimeBoundaryManager(offlineTableConfig, _propertyStore);
-        offlineTableTimeBoundaryManager.init(offlineTableExternalView, offlineTableOnlineSegments);
+        offlineTableTimeBoundaryManager
+            .init(offlineTableExternalView, offlineTableIdealState, offlineTablePreSelectedOnlineSegments);
         offlineTableRoutingEntry.setTimeBoundaryManager(offlineTableTimeBoundaryManager);
       }
     }
@@ -379,6 +380,22 @@ public class RoutingManager implements ClusterChangeHandler {
   }
 
   /**
+   * Returns the online segments (with ONLINE/CONSUMING instances) in the given ideal state.
+   */
+  private static Set<String> getOnlineSegments(IdealState idealState) {
+    Map<String, Map<String, String>> segmentAssignment = idealState.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>(HashUtil.getHashMapCapacity(segmentAssignment.size()));
+    for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
+      Map<String, String> instanceStateMap = entry.getValue();
+      if (instanceStateMap.containsValue(SegmentStateModel.ONLINE) || instanceStateMap
+          .containsValue(SegmentStateModel.CONSUMING)) {
+        onlineSegments.add(entry.getKey());
+      }
+    }
+    return onlineSegments;
+  }
+
+  /**
    * Removes the routing for the given table.
    */
   public synchronized void removeRouting(String tableNameWithType) {
@@ -525,15 +542,16 @@ public class RoutingManager implements ClusterChangeHandler {
     // NOTE: The change gets applied in sequence, and before change applied to all components, there could be some
     // inconsistency between components, which is fine because the inconsistency only exists for the newly changed
     // segments and only lasts for a very short time.
-    void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments) {
+    void onExternalViewChange(ExternalView externalView, IdealState idealState) {
+      Set<String> onlineSegments = getOnlineSegments(idealState);
       Set<String> preSelectedOnlineSegments = _segmentPreSelector.preSelect(onlineSegments);
-      _segmentSelector.onExternalViewChange(externalView, preSelectedOnlineSegments);
+      _segmentSelector.onExternalViewChange(externalView, idealState, preSelectedOnlineSegments);
       for (SegmentPruner segmentPruner : _segmentPruners) {
-        segmentPruner.onExternalViewChange(externalView, preSelectedOnlineSegments);
+        segmentPruner.onExternalViewChange(externalView, idealState, preSelectedOnlineSegments);
       }
-      _instanceSelector.onExternalViewChange(externalView, preSelectedOnlineSegments);
+      _instanceSelector.onExternalViewChange(externalView, idealState, preSelectedOnlineSegments);
       if (_timeBoundaryManager != null) {
-        _timeBoundaryManager.onExternalViewChange(externalView, preSelectedOnlineSegments);
+        _timeBoundaryManager.onExternalViewChange(externalView, idealState, preSelectedOnlineSegments);
       }
       _lastUpdateExternalViewVersion = externalView.getStat().getVersion();
     }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 512f721..7fb8076 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -25,9 +25,12 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
@@ -68,9 +71,10 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   }
 
   @Override
-  public void init(Set<String> enabledInstances, ExternalView externalView, Set<String> onlineSegments) {
+  public void init(Set<String> enabledInstances, ExternalView externalView, IdealState idealState,
+      Set<String> onlineSegments) {
     _enabledInstances = enabledInstances;
-    onExternalViewChange(externalView, onlineSegments);
+    onExternalViewChange(externalView, idealState, onlineSegments);
   }
 
   /**
@@ -126,47 +130,28 @@ abstract class BaseInstanceSelector implements InstanceSelector {
    * {@inheritDoc}
    *
    * <p>Updates the cached maps ({@code segmentToOnlineInstancesMap}, {@code segmentToOfflineInstancesMap} and
-   * {@code instanceToSegmentsMap}) based on the given ExternalView and re-calculates
-   * {@code segmentToEnabledInstancesMap} and {@code unavailableSegments} based on the cached states.
+   * {@code instanceToSegmentsMap}) and re-calculates {@code segmentToEnabledInstancesMap} and
+   * {@code unavailableSegments} based on the cached states.
    */
   @Override
-  public void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments) {
-    Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
-    int numSegments = segmentAssignment.size();
-    _segmentToOnlineInstancesMap = new HashMap<>(HashUtil.getHashMapCapacity(numSegments));
-    _segmentToOfflineInstancesMap = new HashMap<>(HashUtil.getHashMapCapacity(numSegments));
+  public void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
+    int numSegments = onlineSegments.size();
+    int segmentMapCapacity = HashUtil.getHashMapCapacity(numSegments);
+    _segmentToOnlineInstancesMap = new HashMap<>(segmentMapCapacity);
+    _segmentToOfflineInstancesMap = new HashMap<>(segmentMapCapacity);
     if (_instanceToSegmentsMap != null) {
       _instanceToSegmentsMap = new HashMap<>(HashUtil.getHashMapCapacity(_instanceToSegmentsMap.size()));
     } else {
       _instanceToSegmentsMap = new HashMap<>();
     }
 
-    for (Map.Entry<String, Map<String, String>> entry : segmentAssignment.entrySet()) {
-      String segment = entry.getKey();
-      Map<String, String> instanceStateMap = entry.getValue();
-      // NOTE: Instances will be sorted here because 'instanceStateMap' is a TreeMap
-      List<String> onlineInstances = new ArrayList<>(instanceStateMap.size());
-      List<String> offlineInstances = new ArrayList<>(instanceStateMap.size());
-      _segmentToOnlineInstancesMap.put(segment, onlineInstances);
-      _segmentToOfflineInstancesMap.put(segment, offlineInstances);
-      for (Map.Entry<String, String> instanceStateEntry : instanceStateMap.entrySet()) {
-        String instance = instanceStateEntry.getKey();
-        String state = instanceStateEntry.getValue();
-        // Do not track instances in ERROR state
-        if (!state.equals(SegmentStateModel.ERROR)) {
-          _instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
-          if (state.equals(SegmentStateModel.OFFLINE)) {
-            offlineInstances.add(instance);
-          } else {
-            onlineInstances.add(instance);
-          }
-        }
-      }
-    }
+    // Update the cached maps
+    updateSegmentMaps(externalView, idealState, onlineSegments, _segmentToOnlineInstancesMap,
+        _segmentToOfflineInstancesMap, _instanceToSegmentsMap);
 
     // Generate a new map from segment to enabled ONLINE/CONSUMING instances and a new set of unavailable segments (no
     // enabled instance or all enabled instances are in ERROR state)
-    Map<String, List<String>> segmentToEnabledInstancesMap = new HashMap<>(HashUtil.getHashMapCapacity(numSegments));
+    Map<String, List<String>> segmentToEnabledInstancesMap = new HashMap<>(segmentMapCapacity);
     Set<String> unavailableSegments = new HashSet<>();
     // NOTE: Put null as the value when there is no enabled instances for a segment so that segmentToEnabledInstancesMap
     // always contains all segments. With this, in onInstancesChange() we can directly iterate over
@@ -183,6 +168,44 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   }
 
   /**
+   * Updates the segment maps based on the given external view, ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
+   */
+  void updateSegmentMaps(ExternalView externalView, IdealState idealState, Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, List<String>> segmentToOfflineInstancesMap,
+      Map<String, List<String>> instanceToSegmentsMap) {
+    // Iterate over the external view instead of the online segments so that the map lookups are performed on the
+    // HashSet instead of the TreeSet for performance
+    for (Map.Entry<String, Map<String, String>> entry : externalView.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      if (!onlineSegments.contains(segment)) {
+        continue;
+      }
+      Map<String, String> instanceStateMap = entry.getValue();
+      List<String> onlineInstances = new ArrayList<>(instanceStateMap.size());
+      List<String> offlineInstances = new ArrayList<>();
+      segmentToOnlineInstancesMap.put(segment, onlineInstances);
+      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      for (Map.Entry<String, String> instanceStateEntry : instanceStateMap.entrySet()) {
+        String instance = instanceStateEntry.getKey();
+        String state = instanceStateEntry.getValue();
+        // Do not track instances in ERROR state
+        if (!state.equals(SegmentStateModel.ERROR)) {
+          instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
+          if (state.equals(SegmentStateModel.OFFLINE)) {
+            offlineInstances.add(instance);
+          } else {
+            onlineInstances.add(instance);
+          }
+        }
+      }
+      // Sort the online instances for replica-group routing to work. For multiple segments with the same online
+      // instances, if the list is sorted, the same index in the list will always point to the same instance.
+      onlineInstances.sort(null);
+    }
+  }
+
+  /**
    * Calculates the enabled ONLINE/CONSUMING instances for the given segment, and updates the unavailable segments (no
    * enabled instance or all enabled instances are in ERROR state).
    */
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 4619ada..47b777a 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -31,12 +32,11 @@ import org.apache.pinot.common.request.BrokerRequest;
 public interface InstanceSelector {
 
   /**
-   * Initializes the instance selector with the enabled instances, external view and online segments (segments with
-   * ONLINE/CONSUMING instances in ideal state). Should be called only once before calling other methods.
-   * <p>NOTE: {@code onlineSegments} is unused, but intentionally passed in as argument in case it is needed in the
-   * future.
+   * Initializes the instance selector with the enabled instances, external view, ideal state and online segments
+   * (segments with ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector). Should be called
+   * only once before calling other methods.
    */
-  void init(Set<String> enabledInstances, ExternalView externalView, Set<String> onlineSegments);
+  void init(Set<String> enabledInstances, ExternalView externalView, IdealState idealState, Set<String> onlineSegments);
 
   /**
    * Processes the instances change. Changed instances are pre-computed based on the current and previous enabled
@@ -45,12 +45,10 @@ public interface InstanceSelector {
   void onInstancesChange(Set<String> enabledInstances, List<String> changedInstances);
 
   /**
-   * Processes the external view change based on the given online segments (segments with ONLINE/CONSUMING instances in
-   * ideal state).
-   * <p>NOTE: {@code onlineSegments} is unused, but intentionally passed in as argument in case it is needed in the
-   * future.
+   * Processes the external view change based on the given ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
    */
-  void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments);
+  void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments);
 
   /**
    * Selects the server instances for the given segments queried by the given broker request, returns a map from segment
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
index a951dae..8410c46 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorFactory.java
@@ -38,14 +38,20 @@ public class InstanceSelectorFactory {
   public static InstanceSelector getInstanceSelector(TableConfig tableConfig, BrokerMetrics brokerMetrics) {
     String tableNameWithType = tableConfig.getTableName();
     RoutingConfig routingConfig = tableConfig.getRoutingConfig();
-    if (routingConfig != null && (
-        RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
-            || (tableConfig.getTableType() == TableType.OFFLINE && LEGACY_REPLICA_GROUP_OFFLINE_ROUTING
-            .equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())) || (
-            tableConfig.getTableType() == TableType.REALTIME && LEGACY_REPLICA_GROUP_REALTIME_ROUTING
-                .equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())))) {
-      LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", tableNameWithType);
-      return new ReplicaGroupInstanceSelector(tableNameWithType, brokerMetrics);
+    if (routingConfig != null) {
+      if (RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(routingConfig.getInstanceSelectorType())
+          || (tableConfig.getTableType() == TableType.OFFLINE && LEGACY_REPLICA_GROUP_OFFLINE_ROUTING
+          .equalsIgnoreCase(routingConfig.getRoutingTableBuilderName())) || (
+          tableConfig.getTableType() == TableType.REALTIME && LEGACY_REPLICA_GROUP_REALTIME_ROUTING
+              .equalsIgnoreCase(routingConfig.getRoutingTableBuilderName()))) {
+        LOGGER.info("Using ReplicaGroupInstanceSelector for table: {}", tableNameWithType);
+        return new ReplicaGroupInstanceSelector(tableNameWithType, brokerMetrics);
+      }
+      if (RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
+          .equalsIgnoreCase(routingConfig.getInstanceSelectorType())) {
+        LOGGER.info("Using StrictReplicaGroupInstanceSelector for table: {}", tableNameWithType);
+        return new StrictReplicaGroupInstanceSelector(tableNameWithType, brokerMetrics);
+      }
     }
     return new BalancedInstanceSelector(tableNameWithType, brokerMetrics);
   }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
new file mode 100644
index 0000000..a368c79
--- /dev/null
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.broker.routing.instanceselector;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.common.utils.HashUtil;
+
+
+/**
+ * Instance selector for strict replica-group routing strategy.
+ *
+ * <pre>
+ * The strict replica-group routing strategy always routes the query to the instances within the same replica-group.
+ * (Note that the replica-group information is derived from the ideal state of the table, where the instances are sorted
+ * alphabetically in the instance state map, so the replica-groups in the instance selector might not match the
+ * replica-groups in the instance partitions.) The instances in a replica-group should have all the online segments
+ * (segments with ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector) available
+ * (ONLINE/CONSUMING in the external view) in order to serve queries. If any segment is unavailable in the
+ * replica-group, we mark the whole replica-group down and not serve queries with this replica-group.
+ *
+ * The selection algorithm is the same as {@link ReplicaGroupInstanceSelector}, and will always evenly distribute the
+ * traffic to all replica-groups that have all online segments available.
+ *
+ * The algorithm relies on the mirror segment assignment from replica-group segment assignment strategy. With mirror
+ * segment assignment, any server in one replica-group will always have a corresponding server in other replica-groups
+ * that have the same segments assigned. For example, if S1 is a server in replica-group 1, and it has mirror server
+ * S2 in replica-group 2 and S3 in replica-group 3. All segments assigned to S1 will also be assigned to S2 and S3. In
+ * stable scenario (external view matches ideal state), all segments assigned to S1 will have the same enabled instances
+ * of [S1, S2, S3] sorted (in alphabetical order). If we always pick the same index of enabled instances for all
+ * segments, only one of S1, S2, S3 will be picked, and all the segments are processed by the same server. In
+ * transitioning/error scenario (external view does not match ideal state), if a segment is down on S1, we mark all
+ * segments with the same assignment ([S1, S2, S3]) down on S1 to ensure that we always route the segments to the same
+ * replica-group.
+ * </pre>
+ */
+public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSelector {
+
+  public StrictReplicaGroupInstanceSelector(String tableNameWithType, BrokerMetrics brokerMetrics) {
+    super(tableNameWithType, brokerMetrics);
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <pre>
+   * The maps are calculated in the following steps to meet the strict replica-group guarantee:
+   *   1. Create a map from online segment to set of instances hosting the segment based on the ideal state
+   *   2. Gather the online and offline instances for each online segment from the external view
+   *   3. Compare the instances from the ideal state and the external view and gather the unavailable instances for each
+   *      set of instances
+   *   4. Exclude the unavailable instances from the online instances map
+   * </pre>
+   */
+  @Override
+  void updateSegmentMaps(ExternalView externalView, IdealState idealState, Set<String> onlineSegments,
+      Map<String, List<String>> segmentToOnlineInstancesMap, Map<String, List<String>> segmentToOfflineInstancesMap,
+      Map<String, List<String>> instanceToSegmentsMap) {
+    // Iterate over the ideal state to fill up 'idealStateSegmentToInstancesMap' which is a map from segment to set of
+    // instances hosting the segment in the ideal state
+    int segmentMapCapacity = HashUtil.getHashMapCapacity(onlineSegments.size());
+    Map<String, Set<String>> idealStateSegmentToInstancesMap = new HashMap<>(segmentMapCapacity);
+    for (Map.Entry<String, Map<String, String>> entry : idealState.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      // Only track online segments
+      if (!onlineSegments.contains(segment)) {
+        continue;
+      }
+      idealStateSegmentToInstancesMap.put(segment, entry.getValue().keySet());
+    }
+
+    // Iterate over the external view to fill up 'tempSegmentToOnlineInstancesMap' and 'segmentToOfflineInstancesMap'.
+    // 'tempSegmentToOnlineInstancesMap' is a temporary map from segment to set of instances that are in the ideal state
+    // and also ONLINE/CONSUMING in the external view. This map does not have the strict replica-group guarantee, and
+    // will be used to calculate the final 'segmentToOnlineInstancesMap'.
+    Map<String, Set<String>> tempSegmentToOnlineInstancesMap = new HashMap<>(segmentMapCapacity);
+    for (Map.Entry<String, Map<String, String>> entry : externalView.getRecord().getMapFields().entrySet()) {
+      String segment = entry.getKey();
+      Set<String> instancesInIdealState = idealStateSegmentToInstancesMap.get(segment);
+      // Only track online segments
+      if (instancesInIdealState == null) {
+        continue;
+      }
+      Map<String, String> instanceStateMap = entry.getValue();
+      Set<String> tempOnlineInstances = new TreeSet<>();
+      List<String> offlineInstances = new ArrayList<>();
+      tempSegmentToOnlineInstancesMap.put(segment, tempOnlineInstances);
+      segmentToOfflineInstancesMap.put(segment, offlineInstances);
+      for (Map.Entry<String, String> instanceStateEntry : instanceStateMap.entrySet()) {
+        String instance = instanceStateEntry.getKey();
+        // Only track instances within the ideal state
+        if (!instancesInIdealState.contains(instance)) {
+          continue;
+        }
+        String state = instanceStateEntry.getValue();
+        if (state.equals(SegmentStateModel.ONLINE) || state.equals(SegmentStateModel.CONSUMING)) {
+          tempOnlineInstances.add(instance);
+        } else if (state.equals(SegmentStateModel.OFFLINE)) {
+          offlineInstances.add(instance);
+          instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
+        }
+      }
+    }
+
+    // Iterate over the 'tempSegmentToOnlineInstancesMap' to gather the unavailable instances for each set of instances
+    Map<Set<String>, Set<String>> unavailableInstancesMap = new HashMap<>();
+    for (Map.Entry<String, Set<String>> entry : tempSegmentToOnlineInstancesMap.entrySet()) {
+      String segment = entry.getKey();
+      Set<String> tempOnlineInstances = entry.getValue();
+      Set<String> instancesInIdealState = idealStateSegmentToInstancesMap.get(segment);
+      // NOTE: When a segment is unavailable on all the instances, do not count all the instances as unavailable because
+      //       this segment is unavailable and won't be included in the routing table, thus not breaking the requirement
+      //       of routing to the same replica-group. This is normal for new added segments, and we don't want to mark
+      //       all instances down on all segments with the same assignment.
+      if (tempOnlineInstances.size() == instancesInIdealState.size() || tempOnlineInstances.isEmpty()) {
+        continue;
+      }
+      Set<String> unavailableInstances =
+          unavailableInstancesMap.computeIfAbsent(instancesInIdealState, k -> new TreeSet<>());
+      for (String instance : instancesInIdealState) {
+        if (!tempOnlineInstances.contains(instance)) {
+          unavailableInstances.add(instance);
+        }
+      }
+    }
+
+    // Iterate over the 'tempSegmentToOnlineInstancesMap' again to fill up the 'segmentToOnlineInstancesMap' which has
+    // the strict replica-group guarantee
+    for (Map.Entry<String, Set<String>> entry : tempSegmentToOnlineInstancesMap.entrySet()) {
+      String segment = entry.getKey();
+      Set<String> tempOnlineInstances = entry.getValue();
+      // NOTE: Instances will be sorted here because 'tempOnlineInstances' is a TreeSet. We need the online instances to
+      //       be sorted for replica-group routing to work. For multiple segments with the same online instances, if the
+      //       list is sorted, the same index in the list will always point to the same instance.
+      List<String> onlineInstances = new ArrayList<>(tempOnlineInstances.size());
+      segmentToOnlineInstancesMap.put(segment, onlineInstances);
+
+      Set<String> instancesInIdealState = idealStateSegmentToInstancesMap.get(segment);
+      Set<String> unavailableInstances = unavailableInstancesMap.get(instancesInIdealState);
+      if (unavailableInstances == null) {
+        // No unavailable instance, add all instances as online instance
+        for (String instance : tempOnlineInstances) {
+          onlineInstances.add(instance);
+          instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
+        }
+      } else {
+        // Some instances are unavailable, add the remaining instances as online instance
+        for (String instance : tempOnlineInstances) {
+          if (!unavailableInstances.contains(instance)) {
+            onlineInstances.add(instance);
+            instanceToSegmentsMap.computeIfAbsent(instance, k -> new ArrayList<>()).add(segment);
+          }
+        }
+      }
+    }
+  }
+}
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentLineageBasedSegmentPreSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
similarity index 96%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentLineageBasedSegmentPreSelector.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
index 48d1c8c..f8c4c55 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentLineageBasedSegmentPreSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentLineageBasedSegmentPreSelector.java
@@ -16,9 +16,8 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
-import java.util.HashSet;
 import java.util.Set;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelector.java
similarity index 86%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelector.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelector.java
index 4d94c8d..bb07bdd 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelector.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
 import java.util.Set;
 
@@ -34,8 +34,8 @@ import java.util.Set;
 public interface SegmentPreSelector {
 
   /**
-   * Process pre-selection for online segments to filter out unnecessary online segments. It is safe to modify the input
-   * online segments.
+   * Pre-selects the online segments to filter out the unnecessary segments. This method might modify the online segment
+   * set passed in.
    */
   Set<String> preSelect(Set<String> onlineSegments);
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorFactory.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorFactory.java
similarity index 95%
rename from pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorFactory.java
rename to pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorFactory.java
index 55f0e73..5385870 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorFactory.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorFactory.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
 import org.apache.helix.ZNRecord;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
index abc7d3c..bc05bcf 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/PartitionSegmentPruner.java
@@ -27,6 +27,7 @@ import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
@@ -64,7 +65,7 @@ public class PartitionSegmentPruner implements SegmentPruner {
   }
 
   @Override
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
+  public void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
     // Bulk load partition info for all online segments
     int numSegments = onlineSegments.size();
     List<String> segments = new ArrayList<>(onlineSegments);
@@ -124,7 +125,8 @@ public class PartitionSegmentPruner implements SegmentPruner {
   }
 
   @Override
-  public synchronized void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments) {
+  public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
+      Set<String> onlineSegments) {
     // NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
     //       ones. The refreshed segment ZK metadata change won't be picked up.
     for (String segment : onlineSegments) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
index 6197fe8..fb7afa6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPruner.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.routing.segmentpruner;
 import java.util.List;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -30,20 +31,17 @@ import org.apache.pinot.common.request.BrokerRequest;
 public interface SegmentPruner {
 
   /**
-   * Initializes the segment pruner with the external view and online segments (segments with ONLINE/CONSUMING instances
-   * in ideal state). Should be called only once before calling other methods.
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as argument in case it is needed in the
-   * future.
+   * Initializes the segment pruner with the external view, ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector). Should be called only once before
+   * calling other methods.
    */
-  void init(ExternalView externalView, Set<String> onlineSegments);
+  void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments);
 
   /**
-   * Processes the external view change based on the given online segments (segments with ONLINE/CONSUMING instances in
-   * ideal state).
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as argument in case it is needed in the
-   * future.
+   * Processes the external view change based on the given ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
    */
-  void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments);
+  void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments);
 
   /**
    * Refreshes the metadata for the given segment (called when segment is getting refreshed).
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
index e71f45a..0166e4c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/OfflineSegmentSelector.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -33,12 +34,12 @@ public class OfflineSegmentSelector implements SegmentSelector {
   private volatile List<String> _segments;
 
   @Override
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
-    onExternalViewChange(externalView, onlineSegments);
+  public void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
+    onExternalViewChange(externalView, idealState, onlineSegments);
   }
 
   @Override
-  public void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments) {
+  public void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
     // TODO: for new added segments, before all replicas are up, consider not selecting them to avoid causing
     //       hotspot servers
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
index a052da2..ab79c78 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/RealtimeSegmentSelector.java
@@ -27,6 +27,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
 import org.apache.pinot.common.utils.HLCSegmentName;
@@ -55,12 +56,12 @@ public class RealtimeSegmentSelector implements SegmentSelector {
   private volatile List<String> _llcSegments;
 
   @Override
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
-    onExternalViewChange(externalView, onlineSegments);
+  public void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
+    onExternalViewChange(externalView, idealState, onlineSegments);
   }
 
   @Override
-  public void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments) {
+  public void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
     // Group HLC segments by their group id
     // NOTE: Use TreeMap so that group ids are sorted and the result is deterministic
     Map<String, List<String>> groupIdToHLCSegmentsMap = new TreeMap<>();
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
index a140ddf..3909a26 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelector.java
@@ -21,6 +21,7 @@ package org.apache.pinot.broker.routing.segmentselector;
 import java.util.List;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 
 
@@ -43,16 +44,17 @@ import org.apache.pinot.common.request.BrokerRequest;
 public interface SegmentSelector {
 
   /**
-   * Initializes the segment selector with the external view and online segments (segments with ONLINE/CONSUMING
-   * instances in ideal state). Should be called only once before calling other methods.
+   * Initializes the segment selector with the external view, ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector). Should be called only once before
+   * calling other methods.
    */
-  void init(ExternalView externalView, Set<String> onlineSegments);
+  void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments);
 
   /**
-   * Processes the external view change based on the given online segments (segments with ONLINE/CONSUMING instances in
-   * ideal state).
+   * Processes the external view change based on the given ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in ideal state and selected by the pre-selector).
    */
-  void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments);
+  void onExternalViewChange(ExternalView externalView, IdealState idealState, Set<String> onlineSegments);
 
   /**
    * Selects the segments queried by the given broker request. The segments selected should cover the whole dataset
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index 7462913..c7a64bb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -29,6 +29,7 @@ import javax.annotation.Nullable;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.CommonConstants;
@@ -82,21 +83,22 @@ public class TimeBoundaryManager {
 
     // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use
     // (maxEndTime - 1 DAY)
-    _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY.equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency())
-        && _timeUnit != TimeUnit.DAYS;
+    _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) && _timeUnit != TimeUnit.DAYS;
 
     LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeUnit: {}, isHourlyTable: {} for table: {}",
         _timeColumn, _timeUnit, _isHourlyTable, _offlineTableName);
   }
 
   /**
-   * Initializes the time boundary manager with the external view and online segments (segments with ONLINE/CONSUMING
-   * instances in ideal state). Should be called only once before calling other methods.
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as argument in case it is needed in the
-   * future.
+   * Initializes the time boundary manager with the external view, ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector). Should be called only once before
+   * calling other methods.
+   * <p>NOTE: {@code externalView} and {@code idealState} are unused, but intentionally passed in in case they are
+   * needed in the future.
    */
   @SuppressWarnings("unused")
-  public void init(ExternalView externalView, Set<String> onlineSegments) {
+  public void init(ExternalView externalView, IdealState idealState, Set<String> onlineSegments) {
     // Bulk load time info for all online segments
     int numSegments = onlineSegments.size();
     List<String> segments = new ArrayList<>(numSegments);
@@ -161,15 +163,16 @@ public class TimeBoundaryManager {
   }
 
   /**
-   * Processes the external view change based on the given online segments (segments with ONLINE/CONSUMING instances in
-   * ideal state).
+   * Processes the external view change based on the given ideal state and online segments (segments with
+   * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
    * <p>NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
    * ones. The refreshed segment ZK metadata change won't be picked up.
-   * <p>NOTE: {@code externalView} is unused, but intentionally passed in as argument in case it is needed in the
-   * future.
+   * <p>NOTE: {@code externalView} and {@code idealState} are unused, but intentionally passed in in case they are
+   * needed in the future.
    */
   @SuppressWarnings("unused")
-  public synchronized void onExternalViewChange(ExternalView externalView, Set<String> onlineSegments) {
+  public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
+      Set<String> onlineSegments) {
     for (String segment : onlineSegments) {
       _endTimeMap.computeIfAbsent(segment, k -> extractEndTimeFromSegmentZKMetadataZNRecord(segment,
           _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index 3806a27..79fd518 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.spi.config.table.RoutingConfig;
@@ -66,6 +67,11 @@ public class InstanceSelectorTest {
     assertTrue(InstanceSelectorFactory
         .getInstanceSelector(tableConfig, brokerMetrics) instanceof ReplicaGroupInstanceSelector);
 
+    // Strict replica-group instance selector should be returned
+    when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
+    assertTrue(InstanceSelectorFactory
+        .getInstanceSelector(tableConfig, brokerMetrics) instanceof StrictReplicaGroupInstanceSelector);
+
     // Should be backward-compatible with legacy config
     when(routingConfig.getInstanceSelectorType()).thenReturn(null);
     when(tableConfig.getTableType()).thenReturn(TableType.OFFLINE);
@@ -87,12 +93,15 @@ public class InstanceSelectorTest {
     BalancedInstanceSelector balancedInstanceSelector = new BalancedInstanceSelector(offlineTableName, brokerMetrics);
     ReplicaGroupInstanceSelector replicaGroupInstanceSelector =
         new ReplicaGroupInstanceSelector(offlineTableName, brokerMetrics);
+    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
+        new StrictReplicaGroupInstanceSelector(offlineTableName, brokerMetrics);
 
     Set<String> enabledInstances = new HashSet<>();
     ExternalView externalView = new ExternalView(offlineTableName);
-    Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
-    // NOTE: Online segments is not used in the current implementation
-    Set<String> onlineSegments = Collections.emptySet();
+    Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
 
     // 'instance0' and 'instance1' are in the same replica-group, 'instance2' and 'instance3' are in the same
     // replica-group; 'instance0' and 'instance2' serve the same segments, 'instance1' and 'instance3' serve the same
@@ -115,34 +124,51 @@ public class InstanceSelectorTest {
     // Add 2 segments to each instance
     //   [segment0, segment1] -> [instance0, instance2, errorInstance0]
     //   [segment2, segment3] -> [instance1, instance3, errorInstance1]
-    Map<String, String> instanceStateMap0 = new TreeMap<>();
-    instanceStateMap0.put(instance0, ONLINE);
-    instanceStateMap0.put(instance2, ONLINE);
-    instanceStateMap0.put(errorInstance0, ERROR);
     String segment0 = "segment0";
     String segment1 = "segment1";
-    segmentAssignment.put(segment0, instanceStateMap0);
-    segmentAssignment.put(segment1, instanceStateMap0);
-    Map<String, String> instanceStateMap1 = new TreeMap<>();
-    instanceStateMap1.put(instance1, ONLINE);
-    instanceStateMap1.put(instance3, ONLINE);
-    instanceStateMap1.put(errorInstance1, ERROR);
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    externalViewInstanceStateMap0.put(instance0, ONLINE);
+    externalViewInstanceStateMap0.put(instance2, ONLINE);
+    externalViewInstanceStateMap0.put(errorInstance0, ERROR);
+    Map<String, String> idealStateInstanceStateMap0 = new TreeMap<>();
+    idealStateInstanceStateMap0.put(instance0, ONLINE);
+    idealStateInstanceStateMap0.put(instance2, ONLINE);
+    idealStateInstanceStateMap0.put(errorInstance0, ONLINE);
+    externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap0);
+    onlineSegments.add(segment0);
+    onlineSegments.add(segment1);
     String segment2 = "segment2";
     String segment3 = "segment3";
-    segmentAssignment.put(segment2, instanceStateMap1);
-    segmentAssignment.put(segment3, instanceStateMap1);
+    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+    externalViewInstanceStateMap1.put(instance1, ONLINE);
+    externalViewInstanceStateMap1.put(instance3, ONLINE);
+    externalViewInstanceStateMap1.put(errorInstance1, ERROR);
+    Map<String, String> idealStateInstanceStateMap1 = new TreeMap<>();
+    idealStateInstanceStateMap1.put(instance1, ONLINE);
+    idealStateInstanceStateMap1.put(instance3, ONLINE);
+    idealStateInstanceStateMap1.put(errorInstance1, ONLINE);
+    externalViewSegmentAssignment.put(segment2, externalViewInstanceStateMap1);
+    externalViewSegmentAssignment.put(segment3, externalViewInstanceStateMap1);
+    idealStateSegmentAssignment.put(segment2, idealStateInstanceStateMap1);
+    idealStateSegmentAssignment.put(segment3, idealStateInstanceStateMap1);
+    onlineSegments.add(segment2);
+    onlineSegments.add(segment3);
     List<String> segments = Arrays.asList(segment0, segment1, segment2, segment3);
 
-    balancedInstanceSelector.init(enabledInstances, externalView, onlineSegments);
-    replicaGroupInstanceSelector.init(enabledInstances, externalView, onlineSegments);
+    balancedInstanceSelector.init(enabledInstances, externalView, idealState, onlineSegments);
+    replicaGroupInstanceSelector.init(enabledInstances, externalView, idealState, onlineSegments);
+    strictReplicaGroupInstanceSelector.init(enabledInstances, externalView, idealState, onlineSegments);
 
-    // For the first request:
+    // For the 1st request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance0
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance0
     //     segment1 -> instance0
     //     segment2 -> instance1
@@ -164,14 +190,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the second request:
+    // For the 2nd request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance0
     //     segment2 -> instance3
     //     segment3 -> instance1
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance3
@@ -192,19 +221,23 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Disable instance0
     enabledInstances.remove(instance0);
     balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance0));
     replicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance0));
+    strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance0));
 
-    // For the third request:
+    // For the 3rd request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance1
@@ -225,14 +258,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the fourth request:
+    // For the 4th request:
     //   BalancedInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance1
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment0 -> instance2
     //     segment1 -> instance2
     //     segment2 -> instance3
@@ -253,22 +289,29 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Remove segment0 and add segment4
-    segmentAssignment.remove(segment0);
+    externalViewSegmentAssignment.remove(segment0);
+    idealStateSegmentAssignment.remove(segment0);
+    onlineSegments.remove(segment0);
     String segment4 = "segment4";
-    segmentAssignment.put(segment4, instanceStateMap0);
+    externalViewSegmentAssignment.put(segment4, externalViewInstanceStateMap0);
+    idealStateSegmentAssignment.put(segment4, idealStateInstanceStateMap0);
+    onlineSegments.add(segment4);
     segments = Arrays.asList(segment1, segment2, segment3, segment4);
 
     // Requests arrived before changes got picked up
 
-    // For the fifth request:
+    // For the 5th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance1
     //     segment4 -> null
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance1
@@ -287,14 +330,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the sixth request:
+    // For the 6th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
     //     segment4 -> null
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
@@ -313,18 +359,22 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Process the changes
-    balancedInstanceSelector.onExternalViewChange(externalView, onlineSegments);
-    replicaGroupInstanceSelector.onExternalViewChange(externalView, onlineSegments);
+    balancedInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+    replicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+    strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
 
-    // For the seventy request:
+    // For the 7th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance1
     //     segment4 -> instance2
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance1
@@ -345,14 +395,17 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the eighth request:
+    // For the 8th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
     //     segment4 -> instance2
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
@@ -373,19 +426,23 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Re-enable instance0
     enabledInstances.add(instance0);
     balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance0));
     replicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance0));
+    strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance0));
 
-    // For the ninth request:
+    // For the 9th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance0
     //     segment2 -> instance3
     //     segment3 -> instance1
     //     segment4 -> instance2
-    //   ReplicaGroupInstanceSelector:
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
     //     segment1 -> instance0
     //     segment2 -> instance1
     //     segment3 -> instance1
@@ -406,15 +463,100 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-    // For the tenth request:
+    // For the 10th request:
     //   BalancedInstanceSelector:
     //     segment1 -> instance2
     //     segment2 -> instance1
     //     segment3 -> instance3
     //     segment4 -> instance0
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance3
+    //     segment3 -> instance3
+    //     segment4 -> instance2
+    expectedBalancedInstanceSelectorResult = new HashMap<>();
+    expectedBalancedInstanceSelectorResult.put(segment1, instance2);
+    expectedBalancedInstanceSelectorResult.put(segment2, instance1);
+    expectedBalancedInstanceSelectorResult.put(segment3, instance3);
+    expectedBalancedInstanceSelectorResult.put(segment4, instance0);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+    expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
+    expectedReplicaGroupInstanceSelectorResult.put(segment2, instance3);
+    expectedReplicaGroupInstanceSelectorResult.put(segment3, instance3);
+    expectedReplicaGroupInstanceSelectorResult.put(segment4, instance2);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+
+    // segment1 run into ERROR state on instance0
+    Map<String, String> externalViewInstanceStateMap2 = new TreeMap<>();
+    externalViewInstanceStateMap2.put(instance0, ERROR);
+    externalViewInstanceStateMap2.put(instance2, ONLINE);
+    externalViewInstanceStateMap2.put(errorInstance0, ERROR);
+    externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap2);
+    balancedInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+    replicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+    strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+
+    // For the 11th request:
+    //   BalancedInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance3
+    //     segment3 -> instance1
+    //     segment4 -> instance2
     //   ReplicaGroupInstanceSelector:
     //     segment1 -> instance2
+    //     segment2 -> instance1
+    //     segment3 -> instance1
+    //     segment4 -> instance0
+    //   StrictReplicaGroupInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance1
+    //     segment3 -> instance1
+    //     segment4 -> instance2
+    expectedBalancedInstanceSelectorResult = new HashMap<>();
+    expectedBalancedInstanceSelectorResult.put(segment1, instance2);
+    expectedBalancedInstanceSelectorResult.put(segment2, instance3);
+    expectedBalancedInstanceSelectorResult.put(segment3, instance1);
+    expectedBalancedInstanceSelectorResult.put(segment4, instance2);
+    selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedBalancedInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    expectedReplicaGroupInstanceSelectorResult = new HashMap<>();
+    expectedReplicaGroupInstanceSelectorResult.put(segment1, instance2);
+    expectedReplicaGroupInstanceSelectorResult.put(segment2, instance1);
+    expectedReplicaGroupInstanceSelectorResult.put(segment3, instance1);
+    expectedReplicaGroupInstanceSelectorResult.put(segment4, instance0);
+    selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    Map<String, String> expectedStrictReplicaGroupInstanceSelectorResult = new HashMap<>();
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment1, instance2);
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment2, instance1);
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment3, instance1);
+    expectedStrictReplicaGroupInstanceSelectorResult.put(segment4, instance2);
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedStrictReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+
+    // For the 12th request:
+    //   BalancedInstanceSelector:
+    //     segment1 -> instance2
+    //     segment2 -> instance1
+    //     segment3 -> instance3
+    //     segment4 -> instance0
+    //   ReplicaGroupInstanceSelector/StrictReplicaGroupInstanceSelector:
+    //     segment1 -> instance2
     //     segment2 -> instance3
     //     segment3 -> instance3
     //     segment4 -> instance2
@@ -434,6 +576,9 @@ public class InstanceSelectorTest {
     selectionResult = replicaGroupInstanceSelector.select(brokerRequest, segments);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertEquals(selectionResult.getSegmentToInstanceMap(), expectedReplicaGroupInstanceSelectorResult);
+    assertTrue(selectionResult.getUnavailableSegments().isEmpty());
   }
 
   @Test
@@ -441,124 +586,260 @@ public class InstanceSelectorTest {
     String offlineTableName = "testTable_OFFLINE";
     BrokerMetrics brokerMetrics = mock(BrokerMetrics.class);
     BalancedInstanceSelector balancedInstanceSelector = new BalancedInstanceSelector(offlineTableName, brokerMetrics);
+    // ReplicaGroupInstanceSelector has the same behavior as BalancedInstanceSelector for the unavailable segments
+    StrictReplicaGroupInstanceSelector strictReplicaGroupInstanceSelector =
+        new StrictReplicaGroupInstanceSelector(offlineTableName, brokerMetrics);
 
     Set<String> enabledInstances = new HashSet<>();
     ExternalView externalView = new ExternalView(offlineTableName);
-    Map<String, Map<String, String>> segmentAssignment = externalView.getRecord().getMapFields();
-    // NOTE: Online segments is not used in the current implementation
-    Set<String> onlineSegments = Collections.emptySet();
+    Map<String, Map<String, String>> externalViewSegmentAssignment = externalView.getRecord().getMapFields();
+    IdealState idealState = new IdealState(offlineTableName);
+    Map<String, Map<String, String>> idealStateSegmentAssignment = idealState.getRecord().getMapFields();
+    Set<String> onlineSegments = new HashSet<>();
 
     String instance = "instance";
     String errorInstance = "errorInstance";
-    Map<String, String> instanceStateMap = new TreeMap<>();
-    instanceStateMap.put(instance, CONSUMING);
-    instanceStateMap.put(errorInstance, ERROR);
-    String segment = "segment";
-    segmentAssignment.put(segment, instanceStateMap);
-    List<String> segments = Collections.singletonList(segment);
-
-    // Initialize with no enabled instance, segment should be unavailable
+    String segment0 = "segment0";
+    String segment1 = "segment1";
+    Map<String, String> externalViewInstanceStateMap0 = new TreeMap<>();
+    externalViewInstanceStateMap0.put(instance, CONSUMING);
+    externalViewInstanceStateMap0.put(errorInstance, ERROR);
+    Map<String, String> externalViewInstanceStateMap1 = new TreeMap<>();
+    externalViewInstanceStateMap1.put(instance, CONSUMING);
+    externalViewInstanceStateMap1.put(errorInstance, ERROR);
+    Map<String, String> idealStateInstanceStateMap = new TreeMap<>();
+    idealStateInstanceStateMap.put(instance, CONSUMING);
+    idealStateInstanceStateMap.put(errorInstance, ONLINE);
+    externalViewSegmentAssignment.put(segment0, externalViewInstanceStateMap0);
+    externalViewSegmentAssignment.put(segment1, externalViewInstanceStateMap1);
+    idealStateSegmentAssignment.put(segment0, idealStateInstanceStateMap);
+    idealStateSegmentAssignment.put(segment1, idealStateInstanceStateMap);
+    onlineSegments.add(segment0);
+    onlineSegments.add(segment1);
+    List<String> segments = Arrays.asList(segment0, segment1);
+
+    // Initialize with no enabled instance, both segments should be unavailable
     // {
-    //   (disabled) instance: CONSUMING
-    //   (disabled) errorInstance: ERROR
+    //   segment0: {
+    //     (disabled) instance: CONSUMING,
+    //     (disabled) errorInstance: ERROR
+    //   },
+    //   segment1: {
+    //     (disabled) instance: CONSUMING,
+    //     (disabled) errorInstance: ERROR
+    //   }
     // }
-    balancedInstanceSelector.init(enabledInstances, externalView, onlineSegments);
+    balancedInstanceSelector.init(enabledInstances, externalView, idealState, onlineSegments);
+    strictReplicaGroupInstanceSelector.init(enabledInstances, externalView, idealState, onlineSegments);
     BrokerRequest brokerRequest = mock(BrokerRequest.class);
     InstanceSelector.SelectionResult selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
     assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-    assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment));
+    assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
+    selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+    assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+    assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
 
     // Iterate 5 times
     for (int i = 0; i < 5; i++) {
 
-      // Enable the ERROR instance, segment should be unavailable
+      // Enable the ERROR instance, both segments should be unavailable
       // {
-      //   (disabled) instance: CONSUMING
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
       enabledInstances.add(errorInstance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
 
-      // Enable the CONSUMING instance, segment should be available
+      // Enable the CONSUMING instance, both segments should be available
       // {
-      //   (enabled)  instance: CONSUMING
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (enabled)  instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
       enabledInstances.add(instance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), Collections.singletonMap(segment, instance));
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-      // Change the CONSUMING instance to ONLINE, segment should be available
+      // Change the CONSUMING instance to ONLINE, both segments should be available
       // {
-      //   (enabled)  instance: ONLINE
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (enabled)  instance: ONLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: ONLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
-      instanceStateMap.put(instance, ONLINE);
-      balancedInstanceSelector.onExternalViewChange(externalView, onlineSegments);
+      externalViewInstanceStateMap0.put(instance, ONLINE);
+      externalViewInstanceStateMap1.put(instance, ONLINE);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), Collections.singletonMap(segment, instance));
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-      // Change the ONLINE instance to OFFLINE, both segment to instance map and unavailable segment should be empty
+      // Switch the instance state for segment1, both segments should be available for BalancedInstanceSelector, but
+      // unavailable for StrictReplicaGroupInstanceSelector
       // {
-      //   (enabled)  instance: OFFLINE
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (enabled)  instance: ONLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: ERROR,
+      //     (enabled)  errorInstance: ONLINE
+      //   }
       // }
-      instanceStateMap.put(instance, OFFLINE);
-      balancedInstanceSelector.onExternalViewChange(externalView, onlineSegments);
+      externalViewInstanceStateMap1.put(instance, ERROR);
+      externalViewInstanceStateMap1.put(errorInstance, ONLINE);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+      selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 2);
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
+
+      // Switch back the instance state for segment1 and change the ONLINE instance to OFFLINE, both segment to instance
+      // map and unavailable segments should be empty
+      // {
+      //   segment0: {
+      //     (enabled)  instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (enabled)  instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
+      // }
+      externalViewInstanceStateMap0.put(instance, OFFLINE);
+      externalViewInstanceStateMap1.put(instance, OFFLINE);
+      externalViewInstanceStateMap1.put(errorInstance, ERROR);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
-      // Disable the OFFLINE instance, segment should be unavailable
+      // Disable the OFFLINE instance, both segments should be unavailable
       // {
-      //   (disabled) instance: OFFLINE
-      //   (enabled)  errorInstance: ERROR
+      //   segment0: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
       enabledInstances.remove(instance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(instance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
 
-      // Change the ERROR instance to ONLINE, segment should be available
+      // Change the ERROR instance of segment0 to ONLINE, segment0 should be available
+      // (Note that for StrictReplicaGroupInstanceSelector, segment1 does not have any ONLINE/CONSUMING instance, so it
+      // won't mark instance down for segment0)
       // {
-      //   (disabled) instance: OFFLINE
-      //   (enabled)  errorInstance: ONLINE
+      //   segment0: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ONLINE
+      //   },
+      //   segment1: {
+      //     (disabled) instance: OFFLINE,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
-      instanceStateMap.put(errorInstance, ONLINE);
-      balancedInstanceSelector.onExternalViewChange(externalView, onlineSegments);
+      externalViewInstanceStateMap0.put(errorInstance, ONLINE);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), Collections.singletonMap(segment, errorInstance));
-      assertTrue(selectionResult.getUnavailableSegments().isEmpty());
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
+      assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment1));
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertEquals(selectionResult.getSegmentToInstanceMap().size(), 1);
+      assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment1));
 
-      // Disable the ONLINE instance, segment should be unavailable
+      // Disable the ONLINE instance, both segments should be unavailable
       // {
-      //   (disabled) instance: OFFLINE
-      //   (disabled) errorInstance: ONLINE
+      //   segment0: {
+      //     (disabled) instance: OFFLINE,
+      //     (disabled) errorInstance: ONLINE
+      //   },
+      //   segment1: {
+      //     (disabled) instance: OFFLINE,
+      //     (disabled) errorInstance: ERROR
+      //   }
       // }
       enabledInstances.remove(errorInstance);
       balancedInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
+      strictReplicaGroupInstanceSelector.onInstancesChange(enabledInstances, Collections.singletonList(errorInstance));
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
+      selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
 
-      // Change back to initial state, segment should be unavailable
+      // Change back to initial state, both segments should be unavailable
       // {
-      //   (disabled) instance: CONSUMING
-      //   (disabled) errorInstance: ERROR
+      //   segment0: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   },
+      //   segment1: {
+      //     (disabled) instance: CONSUMING,
+      //     (enabled)  errorInstance: ERROR
+      //   }
       // }
-      instanceStateMap.put(instance, CONSUMING);
-      instanceStateMap.put(errorInstance, ERROR);
-      balancedInstanceSelector.onExternalViewChange(externalView, onlineSegments);
+      externalViewInstanceStateMap0.put(instance, CONSUMING);
+      externalViewInstanceStateMap0.put(errorInstance, ERROR);
+      externalViewInstanceStateMap1.put(instance, CONSUMING);
+      balancedInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
+      strictReplicaGroupInstanceSelector.onExternalViewChange(externalView, idealState, onlineSegments);
       selectionResult = balancedInstanceSelector.select(brokerRequest, segments);
       assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
-      assertEquals(selectionResult.getUnavailableSegments(), Collections.singletonList(segment));
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
+      selectionResult = strictReplicaGroupInstanceSelector.select(brokerRequest, segments);
+      assertTrue(selectionResult.getSegmentToInstanceMap().isEmpty());
+      assertEquals(selectionResult.getUnavailableSegments(), Arrays.asList(segment0, segment1));
     }
   }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
similarity index 97%
rename from pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorTest.java
rename to pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
index d3bc30a..d19def7 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentPreSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpreselector/SegmentPreSelectorTest.java
@@ -16,7 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.broker.routing.segmentselector;
+package org.apache.pinot.broker.routing.segmentpreselector;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -49,7 +49,7 @@ public class SegmentPreSelectorTest {
     Set<String> onlineSegments = new HashSet<>();
 
     int numOfflineSegments = 5;
-    for (int i = 0 ; i< numOfflineSegments; i++ ) {
+    for (int i = 0; i < numOfflineSegments; i++) {
       String segmentName = "segment_" + i;
       externalView.setStateMap(segmentName, onlineInstanceStateMap);
       onlineSegments.add(segmentName);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
index 06657dd..8e46a5d 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentpruner/SegmentPrunerTest.java
@@ -30,6 +30,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.ColumnPartitionMetadata;
@@ -140,12 +141,14 @@ public class SegmentPrunerTest {
     BrokerRequest brokerRequest1 = compiler.compileToBrokerRequest(QUERY_1);
     BrokerRequest brokerRequest2 = compiler.compileToBrokerRequest(QUERY_2);
     BrokerRequest brokerRequest3 = compiler.compileToBrokerRequest(QUERY_3);
+    // NOTE: External view and ideal state are not used in the current implementation.
     ExternalView externalView = Mockito.mock(ExternalView.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
 
     PartitionSegmentPruner segmentPruner =
         new PartitionSegmentPruner(OFFLINE_TABLE_NAME, PARTITION_COLUMN, _propertyStore);
     Set<String> onlineSegments = new HashSet<>();
-    segmentPruner.init(externalView, onlineSegments);
+    segmentPruner.init(externalView, idealState, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Collections.emptyList()), Collections.emptyList());
     assertEquals(segmentPruner.prune(brokerRequest2, Collections.emptyList()), Collections.emptyList());
     assertEquals(segmentPruner.prune(brokerRequest3, Collections.emptyList()), Collections.emptyList());
@@ -166,7 +169,7 @@ public class SegmentPrunerTest {
     segmentZKMetadataWithoutPartitionMetadata.setSegmentName(segmentWithoutPartitionMetadata);
     ZKMetadataProvider
         .setOfflineSegmentZKMetadata(_propertyStore, OFFLINE_TABLE_NAME, segmentZKMetadataWithoutPartitionMetadata);
-    segmentPruner.onExternalViewChange(externalView, onlineSegments);
+    segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Collections.singletonList(segmentWithoutPartitionMetadata)),
         Collections.singletonList(segmentWithoutPartitionMetadata));
     assertEquals(segmentPruner.prune(brokerRequest2, Collections.singletonList(segmentWithoutPartitionMetadata)),
@@ -183,7 +186,7 @@ public class SegmentPrunerTest {
     String segment1 = "segment1";
     onlineSegments.add(segment1);
     setSegmentZKMetadata(segment1, "Murmur", 4, 0);
-    segmentPruner.onExternalViewChange(externalView, onlineSegments);
+    segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Arrays.asList(segment0, segment1)),
         Arrays.asList(segment0, segment1));
     assertEquals(segmentPruner.prune(brokerRequest2, Arrays.asList(segment0, segment1)),
@@ -193,7 +196,7 @@ public class SegmentPrunerTest {
 
     // Update partition metadata without refreshing should have no effect
     setSegmentZKMetadata(segment0, "Modulo", 4, 1);
-    segmentPruner.onExternalViewChange(externalView, onlineSegments);
+    segmentPruner.onExternalViewChange(externalView, idealState, onlineSegments);
     assertEquals(segmentPruner.prune(brokerRequest1, Arrays.asList(segment0, segment1)),
         Arrays.asList(segment0, segment1));
     assertEquals(segmentPruner.prune(brokerRequest2, Arrays.asList(segment0, segment1)),
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
index b447bb6..051f8c6 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/segmentselector/SegmentSelectorTest.java
@@ -23,6 +23,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.utils.HLCSegmentName;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -61,10 +62,12 @@ public class SegmentSelectorTest {
     Map<String, String> onlineInstanceStateMap = Collections.singletonMap("server", ONLINE);
     Map<String, String> consumingInstanceStateMap = Collections.singletonMap("server", CONSUMING);
     Set<String> onlineSegments = new HashSet<>();
+    // NOTE: Ideal state is not used in the current implementation.
+    IdealState idealState = mock(IdealState.class);
 
     // Should return an empty list when there is no segment
     RealtimeSegmentSelector segmentSelector = new RealtimeSegmentSelector();
-    segmentSelector.init(externalView, onlineSegments);
+    segmentSelector.init(externalView, idealState, onlineSegments);
     BrokerRequest brokerRequest = mock(BrokerRequest.class);
     assertTrue(segmentSelector.select(brokerRequest).isEmpty());
 
@@ -83,7 +86,7 @@ public class SegmentSelectorTest {
       }
       hlcSegments[i] = hlcSegmentsForGroup;
     }
-    segmentSelector.onExternalViewChange(externalView, onlineSegments);
+    segmentSelector.onExternalViewChange(externalView, idealState, onlineSegments);
 
     // Only HLC segments exist, should select the HLC segments from the first group
     assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), hlcSegments[0]);
@@ -107,7 +110,7 @@ public class SegmentSelectorTest {
         }
       }
     }
-    segmentSelector.onExternalViewChange(externalView, onlineSegments);
+    segmentSelector.onExternalViewChange(externalView, idealState, onlineSegments);
 
     // Both HLC and LLC segments exist, should select the LLC segments
     assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), expectedSelectedLLCSegments);
@@ -122,7 +125,7 @@ public class SegmentSelectorTest {
         onlineSegments.remove(hlcSegment);
       }
     }
-    segmentSelector.onExternalViewChange(externalView, onlineSegments);
+    segmentSelector.onExternalViewChange(externalView, idealState, onlineSegments);
     assertEqualsNoOrder(segmentSelector.select(brokerRequest).toArray(), expectedSelectedLLCSegments);
   }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index 98f5d3e..00ca6c7 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -26,6 +26,7 @@ import org.apache.helix.manager.zk.ZNRecordSerializer;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
 import org.apache.helix.manager.zk.ZkClient;
 import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
@@ -72,71 +73,73 @@ public class TimeBoundaryManagerTest {
 
   @Test
   public void testTimeBoundaryManager() {
-    ExternalView externalView = Mockito.mock(ExternalView.class);
-
     for (TimeUnit timeUnit : TimeUnit.values()) {
       // Test DAILY push table, with timeFieldSpec
       String rawTableName = "testTable_" + timeUnit + "_DAILY";
       TableConfig tableConfig = getTableConfig(rawTableName, "DAILY");
       setSchemaTimeFieldSpec(rawTableName, timeUnit);
-      testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testDailyPushTable(rawTableName, tableConfig, timeUnit);
 
       // Test HOURLY push table, with timeFieldSpec
       rawTableName = "testTable_" + timeUnit + "_HOURLY";
       tableConfig = getTableConfig(rawTableName, "HOURLY");
       setSchemaTimeFieldSpec(rawTableName, timeUnit);
-      testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testHourlyPushTable(rawTableName, tableConfig, timeUnit);
 
       // Test DAILY push table with dateTimeFieldSpec
       rawTableName = "testTableDateTime_" + timeUnit + "_DAILY";
       tableConfig = getTableConfig(rawTableName, "DAILY");
       setSchemaDateTimeFieldSpec(rawTableName, timeUnit);
-      testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testDailyPushTable(rawTableName, tableConfig, timeUnit);
 
       // Test HOURLY push table
       rawTableName = "testTableDateTime_" + timeUnit + "_HOURLY";
       tableConfig = getTableConfig(rawTableName, "HOURLY");
       setSchemaDateTimeFieldSpec(rawTableName, timeUnit);
-      testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+      testHourlyPushTable(rawTableName, tableConfig, timeUnit);
     }
   }
 
-  private void testDailyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit, ExternalView externalView) {
+  private void testDailyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit) {
+    // NOTE: External view and ideal state are not used in the current implementation.
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
+
     // Start with no segment
     TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
     Set<String> onlineSegments = new HashSet<>();
-    timeBoundaryManager.init(externalView, onlineSegments);
+    timeBoundaryManager.init(externalView, idealState, onlineSegments);
     assertNull(timeBoundaryManager.getTimeBoundaryInfo());
 
     // Add the first segment should update the time boundary
     String segment0 = "segment0";
     onlineSegments.add(segment0);
     setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
-    timeBoundaryManager.init(externalView, onlineSegments);
+    timeBoundaryManager.init(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS));
 
     // Add a new segment with larger end time should update the time boundary
     String segment1 = "segment1";
     onlineSegments.add(segment1);
     setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
 
     // Add a new segment with smaller end time should not change the time boundary
     String segment2 = "segment2";
     onlineSegments.add(segment2);
     setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
 
     // Remove the segment with largest end time should update the time boundary
     onlineSegments.remove(segment1);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
 
     // Change segment ZK metadata without refreshing should not update the time boundary
     setSegmentZKMetadata(rawTableName, segment2, 5, timeUnit);
-    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    timeBoundaryManager.onExternalViewChange(externalView, idealState, onlineSegments);
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
 
     // Refresh the changed segment should update the time boundary
@@ -144,13 +147,17 @@ public class TimeBoundaryManagerTest {
     verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(4, TimeUnit.DAYS));
   }
 
-  private void testHourlyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit, ExternalView externalView) {
+  private void testHourlyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit) {
+    // NOTE: External view and ideal state are not used in the current implementation.
+    ExternalView externalView = Mockito.mock(ExternalView.class);
+    IdealState idealState = Mockito.mock(IdealState.class);
+
     TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
     Set<String> onlineSegments = new HashSet<>();
     String segment0 = "segment0";
     onlineSegments.add(segment0);
     setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
-    timeBoundaryManager.init(externalView, onlineSegments);
+    timeBoundaryManager.init(externalView, idealState, onlineSegments);
     long expectedTimeValue;
     if (timeUnit == TimeUnit.DAYS) {
       // Time boundary should be endTime - 1 DAY when time unit is DAYS
@@ -168,10 +175,8 @@ public class TimeBoundaryManagerTest {
   }
 
   private void setSchemaTimeFieldSpec(String rawTableName, TimeUnit timeUnit) {
-    ZKMetadataProvider.setSchema(_propertyStore,
-        new Schema.SchemaBuilder().setSchemaName(rawTableName)
-            .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, timeUnit, TIME_COLUMN), null)
-            .build());
+    ZKMetadataProvider.setSchema(_propertyStore, new Schema.SchemaBuilder().setSchemaName(rawTableName)
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, timeUnit, TIME_COLUMN), null).build());
   }
 
   private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit timeUnit) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
index 83d752c..882f017 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/TableConfigUtils.java
@@ -242,7 +242,7 @@ public final class TableConfigUtils {
    * Validates the upsert-related configurations
    *  - check table type is realtime
    *  - the primary key exists on the schema
-   *  - replica group is configured for routing type
+   *  - strict replica-group is configured for routing type
    *  - consumer type must be low-level
    */
   protected static void validateUpsertConfig(TableConfig tableConfig, Schema schema) {
@@ -265,9 +265,9 @@ public final class TableConfigUtils {
         "Upsert table must use low-level streaming consumer type");
     // replica group is configured for routing
     Preconditions.checkState(
-        tableConfig.getRoutingConfig() != null && RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
+        tableConfig.getRoutingConfig() != null && RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE
             .equalsIgnoreCase(tableConfig.getRoutingConfig().getInstanceSelectorType()),
-        "Upsert table must use replica-group (i.e. replicaGroup) based routing");
+        "Upsert table must use strict replica-group (i.e. strictReplicaGroup) based routing");
     // no startree index
     Preconditions.checkState(
         CollectionUtils.isEmpty(tableConfig.getIndexingConfig().getStarTreeIndexConfigs()) && !tableConfig
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
index 8594463..9ddcb9c 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/TableConfigUtilsTest.java
@@ -159,8 +159,7 @@ public class TableConfigUtilsTest {
 
     // empty timeColumnName - valid
     schema = new Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
-    tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName("").build();
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setTimeColumnName("").build();
     TableConfigUtils.validate(tableConfig, schema);
 
     // valid
@@ -506,8 +505,9 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setInvertedIndexColumns(Arrays.asList("")).build();
+    tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setInvertedIndexColumns(Arrays.asList(""))
+            .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -519,8 +519,9 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setNoDictionaryColumns(Arrays.asList("")).build();
+    tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setNoDictionaryColumns(Arrays.asList(""))
+            .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -532,8 +533,9 @@ public class TableConfigUtilsTest {
       // expected
     }
 
-    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
-        .setOnHeapDictionaryColumns(Arrays.asList("")).build();
+    tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME).setOnHeapDictionaryColumns(Arrays.asList(""))
+            .build();
     TableConfigUtils.validate(tableConfig, schema);
 
     tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
@@ -742,11 +744,13 @@ public class TableConfigUtilsTest {
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
     } catch (Exception e) {
-      Assert.assertEquals(e.getMessage(), "Upsert table must use replica-group (i.e. replicaGroup) based routing");
+      Assert.assertEquals(e.getMessage(),
+          "Upsert table must use strict replica-group (i.e. strictReplicaGroup) based routing");
     }
     tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .setRoutingConfig(new RoutingConfig(null, null, "replicaGroup")).setStreamConfigs(streamConfigs).build();
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
+        .setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
     } catch (Exception e) {
@@ -756,7 +760,7 @@ public class TableConfigUtilsTest {
         .singletonList(new AggregationFunctionColumnPair(AggregationFunctionType.COUNT, "myCol").toColumnName()), 10);
     tableConfig = new TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
         .setUpsertConfig(new UpsertConfig(UpsertConfig.Mode.FULL))
-        .setRoutingConfig(new RoutingConfig(null, null, "replicaGroup"))
+        .setRoutingConfig(new RoutingConfig(null, null, RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE))
         .setStarTreeIndexConfigs(Lists.newArrayList(starTreeIndexConfig)).setStreamConfigs(streamConfigs).build();
     try {
       TableConfigUtils.validateUpsertConfig(tableConfig, schema);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
index ee5b564..196e98d 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/RoutingConfig.java
@@ -20,15 +20,15 @@ package org.apache.pinot.spi.config.table;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import org.apache.pinot.spi.config.BaseJsonConfig;
-
 import java.util.List;
 import javax.annotation.Nullable;
+import org.apache.pinot.spi.config.BaseJsonConfig;
 
 
 public class RoutingConfig extends BaseJsonConfig {
   public static final String PARTITION_SEGMENT_PRUNER_TYPE = "partition";
   public static final String REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "replicaGroup";
+  public static final String STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE = "strictReplicaGroup";
 
   // Replaced by _segmentPrunerTypes and _instanceSelectorType
   @Deprecated
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
index 596e4f8..c285cab 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/upsert_meetupRsvp_realtime_table_config.json
@@ -31,7 +31,7 @@
     "customConfigs": {}
   },
   "routing": {
-    "instanceSelectorType": "replicaGroup"
+    "instanceSelectorType": "strictReplicaGroup"
   },
   "upsertConfig": {
     "mode": "FULL"


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org