You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by xb...@apache.org on 2023/11/29 01:00:33 UTC

(pinot) branch master updated: Allow optional segments that can be skipped by servers without failing the query (#11978)

This is an automated email from the ASF dual-hosted git repository.

xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3bd74b36e4 Allow optional segments that can be skipped by servers without failing the query (#11978)
3bd74b36e4 is described below

commit 3bd74b36e4fad0ba35579854339a07d86f5bebd5
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Tue Nov 28 17:00:27 2023 -0800

    Allow optional segments that can be skipped by servers without failing the query (#11978)
    
    * collect optional segments and pass to servers to handle instead of simply skipped at broker side, which could lead to  wrong query results particularly for upsert tables
---
 .../broker/api/resources/PinotBrokerDebug.java     |  90 ++++++++--
 .../requesthandler/BaseBrokerRequestHandler.java   |  26 +--
 .../requesthandler/GrpcBrokerRequestHandler.java   |  19 ++-
 .../MultiStageBrokerRequestHandler.java            |   9 +-
 .../SingleConnectionBrokerRequestHandler.java      |   8 +-
 .../pinot/broker/routing/BrokerRoutingManager.java |  35 +++-
 .../instanceselector/BalancedInstanceSelector.java |  17 +-
 .../instanceselector/BaseInstanceSelector.java     |  22 ++-
 .../routing/instanceselector/InstanceSelector.java |  21 ++-
 .../MultiStageReplicaGroupSelector.java            |  20 ++-
 .../ReplicaGroupInstanceSelector.java              |  40 +++--
 .../StrictReplicaGroupInstanceSelector.java        |   4 +
 .../broker/broker/HelixBrokerStarterTest.java      |   9 +-
 .../BaseBrokerRequestHandlerTest.java              |   9 +-
 .../instanceselector/InstanceSelectorTest.java     |  53 +++---
 .../pinot/common/request/InstanceRequest.java      | 186 +++++++++++++++++++--
 pinot-common/src/thrift/request.thrift             |   1 +
 .../common/reader/PinotServerDataFetcher.scala     |  11 +-
 .../core/data/manager/BaseTableDataManager.java    |  15 ++
 .../query/executor/ServerQueryExecutorV1Impl.java  |   8 +-
 .../pinot/core/query/logger/ServerQueryLogger.java |   2 +-
 .../core/query/request/ServerQueryRequest.java     |   8 +
 .../apache/pinot/core/routing/RoutingTable.java    |  13 +-
 .../apache/pinot/core/transport/QueryRouter.java   |  24 ++-
 .../pinot/core/transport/QueryRoutingTest.java     |   5 +-
 .../apache/pinot/query/routing/WorkerManager.java  |  12 +-
 .../query/testutils/MockRoutingManagerFactory.java |  12 +-
 .../testutils/MockInstanceDataManagerFactory.java  |   4 +-
 .../local/data/manager/TableDataManager.java       |   5 +
 29 files changed, 522 insertions(+), 166 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
index 30e995298c..f9799d7e75 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotBrokerDebug.java
@@ -27,10 +27,12 @@ import io.swagger.annotations.ApiResponses;
 import io.swagger.annotations.Authorization;
 import io.swagger.annotations.SecurityDefinition;
 import io.swagger.annotations.SwaggerDefinition;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
 import javax.inject.Inject;
 import javax.ws.rs.GET;
 import javax.ws.rs.Path;
@@ -42,6 +44,7 @@ import javax.ws.rs.core.Context;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
 import org.apache.pinot.common.request.BrokerRequest;
@@ -114,13 +117,45 @@ public class PinotBrokerDebug {
   public Map<String, Map<ServerInstance, List<String>>> getRoutingTable(
       @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName) {
     Map<String, Map<ServerInstance, List<String>>> result = new TreeMap<>();
+    getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType,
+        removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap())));
+    if (!result.isEmpty()) {
+      return result;
+    } else {
+      throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/debug/routingTableWithOptionalSegments/{tableName}")
+  @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Table.GET_ROUTING_TABLE)
+  @ApiOperation(value = "Get the routing table for a table, including optional segments")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Routing table"),
+      @ApiResponse(code = 404, message = "Routing not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> getRoutingTableWithOptionalSegments(
+      @ApiParam(value = "Name of the table") @PathParam("tableName") String tableName) {
+    Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> result = new TreeMap<>();
+    getRoutingTable(tableName, (tableNameWithType, routingTable) -> result.put(tableNameWithType,
+        routingTable.getServerInstanceToSegmentsMap()));
+    if (!result.isEmpty()) {
+      return result;
+    } else {
+      throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND);
+    }
+  }
+
+  private void getRoutingTable(String tableName, BiConsumer<String, RoutingTable> consumer) {
     TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableName);
     if (tableType != TableType.REALTIME) {
       String offlineTableName = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
       RoutingTable routingTable = _routingManager.getRoutingTable(
           CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + offlineTableName), getRequestId());
       if (routingTable != null) {
-        result.put(offlineTableName, routingTable.getServerInstanceToSegmentsMap());
+        consumer.accept(offlineTableName, routingTable);
       }
     }
     if (tableType != TableType.OFFLINE) {
@@ -128,14 +163,16 @@ public class PinotBrokerDebug {
       RoutingTable routingTable = _routingManager.getRoutingTable(
           CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM " + realtimeTableName), getRequestId());
       if (routingTable != null) {
-        result.put(realtimeTableName, routingTable.getServerInstanceToSegmentsMap());
+        consumer.accept(realtimeTableName, routingTable);
       }
     }
-    if (!result.isEmpty()) {
-      return result;
-    } else {
-      throw new WebApplicationException("Cannot find routing for table: " + tableName, Response.Status.NOT_FOUND);
-    }
+  }
+
+  private static Map<ServerInstance, List<String>> removeOptionalSegments(
+      Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap) {
+    Map<ServerInstance, List<String>> ret = new HashMap<>();
+    serverInstanceToSegmentsMap.forEach((k, v) -> ret.put(k, v.getLeft()));
+    return ret;
   }
 
   @GET
@@ -152,7 +189,39 @@ public class PinotBrokerDebug {
       @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
       @Context HttpHeaders httpHeaders) {
     BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
+    checkAccessControl(brokerRequest, httpHeaders);
+    RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
+    if (routingTable != null) {
+      return removeOptionalSegments(routingTable.getServerInstanceToSegmentsMap());
+    } else {
+      throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
+    }
+  }
+
+  @GET
+  @Produces(MediaType.APPLICATION_JSON)
+  @Path("/debug/routingTableWithOptionalSegments/sql")
+  @ManualAuthorization
+  @ApiOperation(value = "Get the routing table for a query, including optional segments")
+  @ApiResponses(value = {
+      @ApiResponse(code = 200, message = "Routing table"),
+      @ApiResponse(code = 404, message = "Routing not found"),
+      @ApiResponse(code = 500, message = "Internal server error")
+  })
+  public Map<ServerInstance, Pair<List<String>, List<String>>> getRoutingTableForQueryWithOptionalSegments(
+      @ApiParam(value = "SQL query (table name should have type suffix)") @QueryParam("query") String query,
+      @Context HttpHeaders httpHeaders) {
+    BrokerRequest brokerRequest = CalciteSqlCompiler.compileToBrokerRequest(query);
+    checkAccessControl(brokerRequest, httpHeaders);
+    RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
+    if (routingTable != null) {
+      return routingTable.getServerInstanceToSegmentsMap();
+    } else {
+      throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
+    }
+  }
 
+  private void checkAccessControl(BrokerRequest brokerRequest, HttpHeaders httpHeaders) {
     // TODO: Handle nested queries
     if (brokerRequest.isSetQuerySource() && brokerRequest.getQuerySource().isSetTableName()) {
       if (!_accessControlFactory.create()
@@ -163,13 +232,6 @@ public class PinotBrokerDebug {
     } else {
       throw new WebApplicationException("Table name is not set in the query", Response.Status.BAD_REQUEST);
     }
-
-    RoutingTable routingTable = _routingManager.getRoutingTable(brokerRequest, getRequestId());
-    if (routingTable != null) {
-      return routingTable.getServerInstanceToSegmentsMap();
-    } else {
-      throw new WebApplicationException("Cannot find routing for query: " + query, Response.Status.NOT_FOUND);
-    }
   }
 
   /**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index bcc5049ff0..c93248fbe4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -43,6 +43,7 @@ import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.http.client.methods.HttpDelete;
 import org.apache.http.conn.HttpClientConnectionManager;
 import org.apache.http.util.EntityUtils;
@@ -592,8 +593,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
       // Calculate routing table for the query
       // TODO: Modify RoutingManager interface to directly take PinotQuery
       long routingStartTimeNs = System.nanoTime();
-      Map<ServerInstance, List<String>> offlineRoutingTable = null;
-      Map<ServerInstance, List<String>> realtimeRoutingTable = null;
+      Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable = null;
+      Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable = null;
       List<String> unavailableSegments = new ArrayList<>();
       int numPrunedSegmentsTotal = 0;
       if (offlineBrokerRequest != null) {
@@ -601,7 +602,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         RoutingTable routingTable = _routingManager.getRoutingTable(offlineBrokerRequest, requestId);
         if (routingTable != null) {
           unavailableSegments.addAll(routingTable.getUnavailableSegments());
-          Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
+          Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
+              routingTable.getServerInstanceToSegmentsMap();
           if (!serverInstanceToSegmentsMap.isEmpty()) {
             offlineRoutingTable = serverInstanceToSegmentsMap;
           } else {
@@ -617,7 +619,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
         RoutingTable routingTable = _routingManager.getRoutingTable(realtimeBrokerRequest, requestId);
         if (routingTable != null) {
           unavailableSegments.addAll(routingTable.getUnavailableSegments());
-          Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = routingTable.getServerInstanceToSegmentsMap();
+          Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap =
+              routingTable.getServerInstanceToSegmentsMap();
           if (!serverInstanceToSegmentsMap.isEmpty()) {
             realtimeRoutingTable = serverInstanceToSegmentsMap;
           } else {
@@ -1815,9 +1818,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
    */
   protected abstract BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
       BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
-      RequestContext requestContext)
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+      @Nullable BrokerRequest realtimeBrokerRequest,
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+      ServerStats serverStats, RequestContext requestContext)
       throws Exception;
 
   protected static boolean isPartialResult(BrokerResponse brokerResponse) {
@@ -1858,8 +1862,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     statistics.setNumSegmentsPrunedByValue(response.getNumSegmentsPrunedByValue());
     statistics.setExplainPlanNumEmptyFilterSegments(response.getExplainPlanNumEmptyFilterSegments());
     statistics.setExplainPlanNumMatchAllFilterSegments(response.getExplainPlanNumMatchAllFilterSegments());
-    statistics.setProcessingExceptions(response.getProcessingExceptions().stream().map(Object::toString).collect(
-        Collectors.toList()));
+    statistics.setProcessingExceptions(
+        response.getProcessingExceptions().stream().map(Object::toString).collect(Collectors.toList()));
   }
 
   private String getGlobalQueryId(long requestId) {
@@ -1888,8 +1892,8 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     final String _query;
     final Set<ServerInstance> _servers = new HashSet<>();
 
-    QueryServers(String query, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
-        @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable) {
+    QueryServers(String query, @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+        @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable) {
       _query = query;
       if (offlineRoutingTable != null) {
         _servers.addAll(offlineRoutingTable.keySet());
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
index b2e36d16ef..c2cd3d6027 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
 import org.apache.pinot.broker.routing.BrokerRoutingManager;
@@ -88,9 +89,10 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
   @Override
   protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
       BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
-      RequestContext requestContext)
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+      @Nullable BrokerRequest realtimeBrokerRequest,
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+      ServerStats serverStats, RequestContext requestContext)
       throws Exception {
     // TODO: Support failure detection
     assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
@@ -106,8 +108,8 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
           requestContext.isSampledRequest());
     }
     final long startReduceTimeNanos = System.nanoTime();
-    BrokerResponseNative brokerResponse = _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest,
-        responseMap, timeoutMs, _brokerMetrics);
+    BrokerResponseNative brokerResponse =
+        _streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics);
     requestContext.setReduceTimeNanos(System.nanoTime() - startReduceTimeNanos);
     return brokerResponse;
   }
@@ -116,11 +118,12 @@ public class GrpcBrokerRequestHandler extends BaseBrokerRequestHandler {
    * Query pinot server for data table.
    */
   private void sendRequest(long requestId, TableType tableType, BrokerRequest brokerRequest,
-      Map<ServerInstance, List<String>> routingTable,
+      Map<ServerInstance, Pair<List<String>, List<String>>> routingTable,
       Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, boolean trace) {
-    for (Map.Entry<ServerInstance, List<String>> routingEntry : routingTable.entrySet()) {
+    for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> routingEntry : routingTable.entrySet()) {
       ServerInstance serverInstance = routingEntry.getKey();
-      List<String> segments = routingEntry.getValue();
+      // TODO: support optional segments for GrpcQueryServer.
+      List<String> segments = routingEntry.getValue().getLeft();
       String serverHost = serverInstance.getHostname();
       int port = serverInstance.getGrpcPort();
       // TODO: enable throttling on per host bases.
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index d329a99f69..9e74419830 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -31,6 +31,7 @@ import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.HttpHeaders;
 import javax.ws.rs.core.Response;
 import org.apache.calcite.jdbc.CalciteSchemaBuilder;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.broker.api.AccessControl;
 import org.apache.pinot.broker.api.RequesterIdentity;
 import org.apache.pinot.broker.broker.AccessControlFactory;
@@ -314,9 +315,11 @@ public class MultiStageBrokerRequestHandler extends BaseBrokerRequestHandler {
   @Override
   protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
       BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
-      RequestContext requestContext) {
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+      @Nullable BrokerRequest realtimeBrokerRequest,
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+      ServerStats serverStats, RequestContext requestContext)
+      throws Exception {
     throw new UnsupportedOperationException();
   }
 
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
index d44532a500..68ae70f9eb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.broker.broker.AccessControlFactory;
 import org.apache.pinot.broker.failuredetector.FailureDetector;
 import org.apache.pinot.broker.failuredetector.FailureDetectorFactory;
@@ -101,9 +102,10 @@ public class SingleConnectionBrokerRequestHandler extends BaseBrokerRequestHandl
   @Override
   protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
       BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> offlineRoutingTable, @Nullable BrokerRequest realtimeBrokerRequest,
-      @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
-      RequestContext requestContext)
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+      @Nullable BrokerRequest realtimeBrokerRequest,
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+      ServerStats serverStats, RequestContext requestContext)
       throws Exception {
     assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
     if (requestContext.isSampledRequest()) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
index 3cd14e676f..cc3a5354ef 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.BaseDataAccessor;
 import org.apache.helix.HelixConstants.ChangeType;
@@ -610,19 +611,38 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
       return null;
     }
     InstanceSelector.SelectionResult selectionResult = routingEntry.calculateRouting(brokerRequest, requestId);
-    Map<String, String> segmentToInstanceMap = selectionResult.getSegmentToInstanceMap();
-    Map<ServerInstance, List<String>> serverInstanceToSegmentsMap = new HashMap<>();
-    for (Map.Entry<String, String> entry : segmentToInstanceMap.entrySet()) {
+    return new RoutingTable(getServerInstanceToSegmentsMap(tableNameWithType, selectionResult),
+        selectionResult.getUnavailableSegments(), selectionResult.getNumPrunedSegments());
+  }
+
+  private Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap(String tableNameWithType,
+      InstanceSelector.SelectionResult selectionResult) {
+    Map<ServerInstance, Pair<List<String>, List<String>>> merged = new HashMap<>();
+    for (Map.Entry<String, String> entry : selectionResult.getSegmentToInstanceMap().entrySet()) {
       ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
       if (serverInstance != null) {
-        serverInstanceToSegmentsMap.computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(entry.getKey());
+        Pair<List<String>, List<String>> pair =
+            merged.computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), new ArrayList<>()));
+        pair.getLeft().add(entry.getKey());
       } else {
         // Should not happen in normal case unless encountered unexpected exception when updating routing entries
         _brokerMetrics.addMeteredTableValue(tableNameWithType, BrokerMeter.SERVER_MISSING_FOR_ROUTING, 1L);
       }
     }
-    return new RoutingTable(serverInstanceToSegmentsMap, selectionResult.getUnavailableSegments(),
-        selectionResult.getNumPrunedSegments());
+    for (Map.Entry<String, String> entry : selectionResult.getOptionalSegmentToInstanceMap().entrySet()) {
+      ServerInstance serverInstance = _enabledServerInstanceMap.get(entry.getValue());
+      if (serverInstance != null) {
+        Pair<List<String>, List<String>> pair = merged.get(serverInstance);
+        // Skip servers that don't have non-optional segments, so that servers always get some non-optional segments
+        // to process, to be backward compatible.
+        // TODO: allow servers only with optional segments
+        if (pair != null) {
+          pair.getRight().add(entry.getKey());
+        }
+      }
+      // TODO: Report missing server metrics when we allow servers only with optional segments.
+    }
+    return merged;
   }
 
   @Override
@@ -795,7 +815,8 @@ public class BrokerRoutingManager implements RoutingManager, ClusterChangeHandle
         selectionResult.setNumPrunedSegments(numPrunedSegments);
         return selectionResult;
       } else {
-        return new InstanceSelector.SelectionResult(Collections.emptyMap(), Collections.emptyList(), numPrunedSegments);
+        return new InstanceSelector.SelectionResult(Pair.of(Collections.emptyMap(), Collections.emptyMap()),
+            Collections.emptyList(), numPrunedSegments);
       }
     }
   }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
index 17c9d5e5b8..77b5389fd4 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BalancedInstanceSelector.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.broker.routing.adaptiveserverselector.AdaptiveServerSelector;
@@ -54,9 +55,11 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
   }
 
   @Override
-  Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
-      Map<String, String> queryOptions) {
+  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId,
+      SegmentStates segmentStates, Map<String, String> queryOptions) {
     Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time.
+    Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
     if (_adaptiveServerSelector != null) {
       for (String segment : segments) {
         List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment);
@@ -70,8 +73,12 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
           candidateInstances.add(candidate.getInstance());
         }
         String selectedInstance = _adaptiveServerSelector.select(candidateInstances);
+        // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+        // broker or server can skip it upon any issue to process it.
         if (candidates.get(candidateInstances.indexOf(selectedInstance)).isOnline()) {
           segmentToSelectedInstanceMap.put(segment, selectedInstance);
+        } else {
+          optionalSegmentToInstanceMap.put(segment, selectedInstance);
         }
       }
     } else {
@@ -84,11 +91,15 @@ public class BalancedInstanceSelector extends BaseInstanceSelector {
         }
         int selectedIdx = requestId++ % candidates.size();
         SegmentInstanceCandidate selectedCandidate = candidates.get(selectedIdx);
+        // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+        // broker or server can skip it upon any issue to process it.
         if (selectedCandidate.isOnline()) {
           segmentToSelectedInstanceMap.put(segment, selectedCandidate.getInstance());
+        } else {
+          optionalSegmentToInstanceMap.put(segment, selectedCandidate.getInstance());
         }
       }
     }
-    return segmentToSelectedInstanceMap;
+    return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
   }
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
index 972142663e..b2961eef94 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/BaseInstanceSelector.java
@@ -30,6 +30,7 @@ import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
@@ -270,6 +271,10 @@ abstract class BaseInstanceSelector implements InstanceSelector {
         }
       }
     }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: {}", _newSegmentStateMap.keySet(),
+          _oldSegmentCandidatesMap.keySet());
+    }
   }
 
   /**
@@ -408,10 +413,11 @@ abstract class BaseInstanceSelector implements InstanceSelector {
     // Copy the volatile reference so that segmentToInstanceMap and unavailableSegments can have a consistent view of
     // the state.
     SegmentStates segmentStates = _segmentStates;
-    Map<String, String> segmentToInstanceMap = select(segments, requestIdInt, segmentStates, queryOptions);
+    Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap =
+        select(segments, requestIdInt, segmentStates, queryOptions);
     Set<String> unavailableSegments = segmentStates.getUnavailableSegments();
     if (unavailableSegments.isEmpty()) {
-      return new SelectionResult(segmentToInstanceMap, Collections.emptyList());
+      return new SelectionResult(segmentToInstanceMap, Collections.emptyList(), 0);
     } else {
       List<String> unavailableSegmentsForRequest = new ArrayList<>();
       for (String segment : segments) {
@@ -419,7 +425,7 @@ abstract class BaseInstanceSelector implements InstanceSelector {
           unavailableSegmentsForRequest.add(segment);
         }
       }
-      return new SelectionResult(segmentToInstanceMap, unavailableSegmentsForRequest);
+      return new SelectionResult(segmentToInstanceMap, unavailableSegmentsForRequest, 0);
     }
   }
 
@@ -429,9 +435,11 @@ abstract class BaseInstanceSelector implements InstanceSelector {
   }
 
   /**
-   * Selects the server instances for the given segments based on the request id and segment states. Returns a map
-   * from segment to selected server instance hosting the segment.
+   * Selects the server instances for the given segments based on the request id and segment states. Returns two maps
+   * from segment to selected server instance hosting the segment. The 2nd map is for optional segments. The optional
+   * segments are used to get the new segments that is not online yet. Instead of simply skipping them by broker at
+   * routing time, we can send them to servers and let servers decide how to handle them.
    */
-  abstract Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
-      Map<String, String> queryOptions);
+  abstract Pair<Map<String, String>, Map<String, String>/*optional segments*/> select(List<String> segments,
+      int requestId, SegmentStates segmentStates, Map<String, String> queryOptions);
 }
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
index 9ffe830229..d003723c5b 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelector.java
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.pinot.broker.routing.segmentpreselector.SegmentPreSelector;
@@ -75,16 +76,12 @@ public interface InstanceSelector {
   Set<String> getServingInstances();
 
   class SelectionResult {
-    private final Map<String, String> _segmentToInstanceMap;
+    private final Pair<Map<String, String>, Map<String, String>/*optional segments*/> _segmentToInstanceMap;
     private final List<String> _unavailableSegments;
     private int _numPrunedSegments;
 
-    public SelectionResult(Map<String, String> segmentToInstanceMap, List<String> unavailableSegments) {
-      this(segmentToInstanceMap, unavailableSegments, 0);
-    }
-
-    public SelectionResult(Map<String, String> segmentToInstanceMap, List<String> unavailableSegments,
-        int numPrunedSegments) {
+    public SelectionResult(Pair<Map<String, String>, Map<String, String>> segmentToInstanceMap,
+        List<String> unavailableSegments, int numPrunedSegments) {
       _segmentToInstanceMap = segmentToInstanceMap;
       _unavailableSegments = unavailableSegments;
       _numPrunedSegments = numPrunedSegments;
@@ -94,7 +91,15 @@ public interface InstanceSelector {
      * Returns the map from segment to selected server instance hosting the segment.
      */
     public Map<String, String> getSegmentToInstanceMap() {
-      return _segmentToInstanceMap;
+      return _segmentToInstanceMap.getLeft();
+    }
+
+    /**
+     * Returns the map from optional segment to selected server instance hosting the optional segment.
+     * Optional segments can be skipped by broker or server upon any issue w/o failing the query.
+     */
+    public Map<String, String> getOptionalSegmentToInstanceMap() {
+      return _segmentToInstanceMap.getRight();
     }
 
     /**
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
index 9be701ebe5..15fb525a8c 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/MultiStageReplicaGroupSelector.java
@@ -27,6 +27,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.store.zk.ZkHelixPropertyStore;
@@ -81,8 +82,8 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
   }
 
   @Override
-  Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
-      Map<String, String> queryOptions) {
+  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId,
+      SegmentStates segmentStates, Map<String, String> queryOptions) {
     // Create a copy of InstancePartitions to avoid race-condition with event-listeners above.
     InstancePartitions instancePartitions = _instancePartitions;
     int replicaGroupSelected = requestId % instancePartitions.getNumReplicaGroups();
@@ -102,14 +103,15 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
    * Returns a map from the segmentName to the corresponding server in the given replica-group. If the is not enabled,
    * we throw an exception.
    */
-  private Map<String, String> tryAssigning(List<String> segments, SegmentStates segmentStates,
-      InstancePartitions instancePartitions, int replicaId) {
+  private Pair<Map<String, String>, Map<String, String>> tryAssigning(List<String> segments,
+      SegmentStates segmentStates, InstancePartitions instancePartitions, int replicaId) {
     Set<String> instanceLookUpSet = new HashSet<>();
     for (int partition = 0; partition < instancePartitions.getNumPartitions(); partition++) {
       List<String> instances = instancePartitions.getInstances(partition, replicaId);
       instanceLookUpSet.addAll(instances);
     }
-    Map<String, String> result = new HashMap<>();
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>();
+    Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
     for (String segment : segments) {
       List<SegmentInstanceCandidate> candidates = segmentStates.getCandidates(segment);
       // If candidates are null, we will throw an exception and log a warning.
@@ -119,8 +121,12 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
         String instance = candidate.getInstance();
         if (instanceLookUpSet.contains(instance)) {
           found = true;
+          // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+          // broker or server can skip it upon any issue to process it.
           if (candidate.isOnline()) {
-            result.put(segment, instance);
+            segmentToSelectedInstanceMap.put(segment, instance);
+          } else {
+            optionalSegmentToInstanceMap.put(segment, instance);
           }
           break;
         }
@@ -129,7 +135,7 @@ public class MultiStageReplicaGroupSelector extends BaseInstanceSelector {
         throw new RuntimeException(String.format("Unable to find an enabled instance for segment: %s", segment));
       }
     }
-    return result;
+    return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
   }
 
   @VisibleForTesting
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
index 9aedaa8e66..3683ca46bb 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/ReplicaGroupInstanceSelector.java
@@ -69,8 +69,8 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
   }
 
   @Override
-  Map<String, String> select(List<String> segments, int requestId, SegmentStates segmentStates,
-      Map<String, String> queryOptions) {
+  Pair<Map<String, String>, Map<String, String>> select(List<String> segments, int requestId,
+      SegmentStates segmentStates, Map<String, String> queryOptions) {
     if (_adaptiveServerSelector != null) {
       // Adaptive Server Selection is enabled.
       List<String> serverRankList = new ArrayList<>();
@@ -90,9 +90,11 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
     }
   }
 
-  private Map<String, String> selectServersUsingRoundRobin(List<String> segments, int requestId,
-      SegmentStates segmentStates, Map<String, String> queryOptions) {
-    Map<String, String> selectedServers = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+  private Pair<Map<String, String>, Map<String, String>> selectServersUsingRoundRobin(List<String> segments,
+      int requestId, SegmentStates segmentStates, Map<String, String> queryOptions) {
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time.
+    Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
     Integer numReplicaGroupsToQuery = QueryOptionsUtils.getNumReplicaGroupsToQuery(queryOptions);
     int numReplicaGroups = numReplicaGroupsToQuery == null ? 1 : numReplicaGroupsToQuery;
     int replicaOffset = 0;
@@ -107,22 +109,26 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
       int numCandidates = candidates.size();
       int instanceIdx = (requestId + replicaOffset) % numCandidates;
       SegmentInstanceCandidate selectedInstance = candidates.get(instanceIdx);
-      // Only put online instance.
-      // This can only be offline when it is a new segment.
+      // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+      // broker or server can skip it upon any issue to process it.
       if (selectedInstance.isOnline()) {
-        selectedServers.put(segment, selectedInstance.getInstance());
+        segmentToSelectedInstanceMap.put(segment, selectedInstance.getInstance());
+      } else {
+        optionalSegmentToInstanceMap.put(segment, selectedInstance.getInstance());
       }
       if (numReplicaGroups > numCandidates) {
         numReplicaGroups = numCandidates;
       }
       replicaOffset = (replicaOffset + 1) % numReplicaGroups;
     }
-    return selectedServers;
+    return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
   }
 
-  private Map<String, String> selectServersUsingAdaptiveServerSelector(List<String> segments, int requestId,
-      SegmentStates segmentStates, List<String> serverRankList) {
-    Map<String, String> selectedServers = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+  private Pair<Map<String, String>, Map<String, String>> selectServersUsingAdaptiveServerSelector(List<String> segments,
+      int requestId, SegmentStates segmentStates, List<String> serverRankList) {
+    Map<String, String> segmentToSelectedInstanceMap = new HashMap<>(HashUtil.getHashMapCapacity(segments.size()));
+    // No need to adjust this map per total segment numbers, as optional segments should be empty most of the time.
+    Map<String, String> optionalSegmentToInstanceMap = new HashMap<>();
     for (String segment : segments) {
       // NOTE: candidates can be null when there is no enabled instances for the segment, or the instance selector has
       // not been updated (we update all components for routing in sequence)
@@ -151,13 +157,15 @@ public class ReplicaGroupInstanceSelector extends BaseInstanceSelector {
           }
         }
       }
-      // Only put online instance.
-      // This can only be offline when it is a new segment.
+      // This can only be offline when it is a new segment. And such segment is marked as optional segment so that
+      // broker or server can skip it upon any issue to process it.
       if (selectedInstance.isOnline()) {
-        selectedServers.put(segment, selectedInstance.getInstance());
+        segmentToSelectedInstanceMap.put(segment, selectedInstance.getInstance());
+      } else {
+        optionalSegmentToInstanceMap.put(segment, selectedInstance.getInstance());
       }
     }
-    return selectedServers;
+    return Pair.of(segmentToSelectedInstanceMap, optionalSegmentToInstanceMap);
   }
 
   private List<String> fetchCandidateServersForQuery(List<String> segments, SegmentStates segmentStates) {
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
index 8206d49452..8c352bdbe6 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/instanceselector/StrictReplicaGroupInstanceSelector.java
@@ -172,5 +172,9 @@ public class StrictReplicaGroupInstanceSelector extends ReplicaGroupInstanceSele
       }
       _newSegmentStateMap.put(segment, new NewSegmentState(newSegmentCreationTimeMap.get(segment), candidates));
     }
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Got _newSegmentStateMap: {}, _oldSegmentCandidatesMap: {}", _newSegmentStateMap.keySet(),
+          _oldSegmentCandidatesMap.keySet());
+    }
   }
 }
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index 438836a4cf..039d7a4205 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -163,7 +163,8 @@ public class HelixBrokerStarterTest extends ControllerTest {
     RoutingTable routingTable = routingManager.getRoutingTable(brokerRequest, 0);
     assertNotNull(routingTable);
     assertEquals(routingTable.getServerInstanceToSegmentsMap().size(), NUM_SERVERS);
-    assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().size(), NUM_OFFLINE_SEGMENTS);
+    assertEquals(routingTable.getServerInstanceToSegmentsMap().values().iterator().next().getLeft().size(),
+        NUM_OFFLINE_SEGMENTS);
     assertTrue(routingTable.getUnavailableSegments().isEmpty());
 
     // Add a new segment into the OFFLINE table
@@ -171,9 +172,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
         SegmentMetadataMockUtils.mockSegmentMetadata(RAW_TABLE_NAME), "downloadUrl");
 
     TestUtils.waitForCondition(aVoid ->
-        routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap()
-            .values().iterator().next().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L, "Failed to add the new segment "
-        + "into the routing table");
+            routingManager.getRoutingTable(brokerRequest, 0).getServerInstanceToSegmentsMap().values().iterator().next()
+                .getLeft().size() == NUM_OFFLINE_SEGMENTS + 1, 30_000L,
+        "Failed to add the new segment " + "into the routing table");
 
     // Add a new table with different broker tenant
     String newRawTableName = "newTable";
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
index e38db6e020..29d2c5b428 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandlerTest.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.pinot.broker.broker.AllowAllAccessControlFactory;
 import org.apache.pinot.broker.queryquota.QueryQuotaManager;
@@ -199,7 +200,7 @@ public class BaseBrokerRequestHandlerTest {
     RoutingTable rt = mock(RoutingTable.class);
     when(rt.getServerInstanceToSegmentsMap()).thenReturn(
         Collections.singletonMap(new ServerInstance(new InstanceConfig("server01_9000")),
-            Collections.singletonList("segment01")));
+            Pair.of(Collections.singletonList("segment01"), Collections.emptyList())));
     when(routingManager.getRoutingTable(any(), Mockito.anyLong())).thenReturn(rt);
     QueryQuotaManager queryQuotaManager = mock(QueryQuotaManager.class);
     when(queryQuotaManager.acquire(anyString())).thenReturn(true);
@@ -223,10 +224,10 @@ public class BaseBrokerRequestHandlerTest {
           @Override
           protected BrokerResponseNative processBrokerRequest(long requestId, BrokerRequest originalBrokerRequest,
               BrokerRequest serverBrokerRequest, @Nullable BrokerRequest offlineBrokerRequest,
-              @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
+              @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
               @Nullable BrokerRequest realtimeBrokerRequest,
-              @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable, long timeoutMs, ServerStats serverStats,
-              RequestContext requestContext)
+              @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs,
+              ServerStats serverStats, RequestContext requestContext)
               throws Exception {
             testRequestId[0] = requestId;
             latch.await();
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
index fd2568e183..c748be4885 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/instanceselector/InstanceSelectorTest.java
@@ -151,7 +151,7 @@ public class InstanceSelectorTest {
   }
 
   private static boolean isReplicaGroupType(String selectorType) {
-    return selectorType.equals(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) || selectorType.equals(
+    return selectorType.equals(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) || selectorType.equals(
         STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
   }
 
@@ -165,7 +165,7 @@ public class InstanceSelectorTest {
   @DataProvider(name = "selectorType")
   public Object[] getSelectorType() {
     return new Object[]{
-        RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
+        REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE,
         BALANCED_INSTANCE_SELECTOR
     };
   }
@@ -203,7 +203,7 @@ public class InstanceSelectorTest {
         brokerMetrics) instanceof BalancedInstanceSelector);
 
     // Replica-group instance selector should be returned
-    when(routingConfig.getInstanceSelectorType()).thenReturn(RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
+    when(routingConfig.getInstanceSelectorType()).thenReturn(REPLICA_GROUP_INSTANCE_SELECTOR_TYPE);
     assertTrue(InstanceSelectorFactory.getInstanceSelector(tableConfig, propertyStore,
         brokerMetrics) instanceof ReplicaGroupInstanceSelector);
 
@@ -1414,15 +1414,15 @@ public class InstanceSelectorTest {
       // First selection, we select instance0 for oldSeg and instance1 for newSeg in balance selector
       // For replica group, we select instance0 for oldSeg and newSeg. Because newSeg is not online in instance0, so
       // we exclude it from selection result.
-      Map<String, String> expectedSelectionResult;
+      InstanceSelector.SelectionResult selectionResult =
+          selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
       if (isReplicaGroupType(selectorType)) {
-        expectedSelectionResult = ImmutableMap.of(oldSeg, instance0);
+        assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance0));
+        assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0));
       } else {
-        expectedSelectionResult = ImmutableMap.of(oldSeg, instance0, newSeg, instance1);
+        assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance0, newSeg, instance1));
+        assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
       }
-      InstanceSelector.SelectionResult selectionResult =
-          selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     }
     {
@@ -1430,21 +1430,22 @@ public class InstanceSelectorTest {
       // Second selection, we select instance1 for oldSeg and instance0 for newSeg in balance selector
       // Because newSeg is not online in instance0, so we exclude it from selection result.
       // For replica group, we select instance1 for oldSeg and newSeg.
-      Map<String, String> expectedSelectionResult;
+      InstanceSelector.SelectionResult selectionResult =
+          selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
       switch (selectorType) {
         case BALANCED_INSTANCE_SELECTOR:
-          expectedSelectionResult = ImmutableMap.of(oldSeg, instance1);
+          assertEquals(selectionResult.getSegmentToInstanceMap(), ImmutableMap.of(oldSeg, instance1));
+          assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0));
           break;
         case STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE: // fall through
-        case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
-          expectedSelectionResult = ImmutableMap.of(oldSeg, instance1, newSeg, instance1);
+        case REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
+          assertEquals(selectionResult.getSegmentToInstanceMap(),
+              ImmutableMap.of(oldSeg, instance1, newSeg, instance1));
+          assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
           break;
         default:
           throw new RuntimeException("unsupported selector type:" + selectorType);
       }
-      InstanceSelector.SelectionResult selectionResult =
-          selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     }
     // Advance the clock to make newSeg to old segment.
@@ -1453,21 +1454,22 @@ public class InstanceSelectorTest {
     selector.init(enabledInstances, idealState, externalView, onlineSegments);
     {
       int requestId = 0;
-      Map<String, String> expectedSelectionResult;
+      InstanceSelector.SelectionResult selectionResult =
+          selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
       switch (selectorType) {
         case BALANCED_INSTANCE_SELECTOR: // fall through
-        case RoutingConfig.REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
-          expectedSelectionResult = ImmutableMap.of(oldSeg, instance0, newSeg, instance1);
+        case REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
+          assertEquals(selectionResult.getSegmentToInstanceMap(),
+              ImmutableMap.of(oldSeg, instance0, newSeg, instance1));
           break;
         case STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE:
-          expectedSelectionResult = ImmutableMap.of(oldSeg, instance1, newSeg, instance1);
+          assertEquals(selectionResult.getSegmentToInstanceMap(),
+              ImmutableMap.of(oldSeg, instance1, newSeg, instance1));
           break;
         default:
           throw new RuntimeException("unsupported selector type:" + selectorType);
       }
-      InstanceSelector.SelectionResult selectionResult =
-          selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
-      assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
+      assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     }
     {
@@ -1476,6 +1478,7 @@ public class InstanceSelectorTest {
       InstanceSelector.SelectionResult selectionResult =
           selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
       assertEquals(selectionResult.getSegmentToInstanceMap(), expectedSelectionResult);
+      assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
       assertTrue(selectionResult.getUnavailableSegments().isEmpty());
     }
   }
@@ -1523,19 +1526,21 @@ public class InstanceSelectorTest {
     InstanceSelector.SelectionResult selectionResult =
         selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedResult);
+    assertEquals(selectionResult.getOptionalSegmentToInstanceMap(), ImmutableMap.of(newSeg, instance0));
     assertTrue(selectionResult.getUnavailableSegments().isEmpty());
 
     // Advance the clock to make newSeg to old segment and we see newSeg is reported as unavailable segment.
     _mutableClock.fastForward(Duration.ofMillis(NEW_SEGMENT_EXPIRATION_MILLIS + 10));
     selector.init(enabledInstances, idealState, externalView, onlineSegments);
     selectionResult = selector.select(_brokerRequest, Lists.newArrayList(onlineSegments), requestId);
-    if (selectorType == STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE) {
+    if (STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equals(selectorType)) {
       expectedResult = ImmutableMap.of();
       assertEquals(selectionResult.getUnavailableSegments(), ImmutableList.of(newSeg, oldSeg));
     } else {
       assertEquals(selectionResult.getUnavailableSegments(), ImmutableList.of(newSeg));
     }
     assertEquals(selectionResult.getSegmentToInstanceMap(), expectedResult);
+    assertTrue(selectionResult.getOptionalSegmentToInstanceMap().isEmpty());
   }
 
   @Test(dataProvider = "selectorType")
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java b/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java
index 24870f77be..d266fef76b 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/request/InstanceRequest.java
@@ -25,7 +25,7 @@
 package org.apache.pinot.common.request;
 
 @SuppressWarnings({"cast", "rawtypes", "serial", "unchecked", "unused"})
-@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-09-27")
+@javax.annotation.Generated(value = "Autogenerated by Thrift Compiler (0.15.0)", date = "2023-11-16")
 public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest, InstanceRequest._Fields>, java.io.Serializable, Cloneable, Comparable<InstanceRequest> {
   private static final org.apache.thrift.protocol.TStruct STRUCT_DESC = new org.apache.thrift.protocol.TStruct("InstanceRequest");
 
@@ -34,6 +34,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
   private static final org.apache.thrift.protocol.TField SEARCH_SEGMENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("searchSegments", org.apache.thrift.protocol.TType.LIST, (short)3);
   private static final org.apache.thrift.protocol.TField ENABLE_TRACE_FIELD_DESC = new org.apache.thrift.protocol.TField("enableTrace", org.apache.thrift.protocol.TType.BOOL, (short)4);
   private static final org.apache.thrift.protocol.TField BROKER_ID_FIELD_DESC = new org.apache.thrift.protocol.TField("brokerId", org.apache.thrift.protocol.TType.STRING, (short)5);
+  private static final org.apache.thrift.protocol.TField OPTIONAL_SEGMENTS_FIELD_DESC = new org.apache.thrift.protocol.TField("optionalSegments", org.apache.thrift.protocol.TType.LIST, (short)6);
 
   private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new InstanceRequestStandardSchemeFactory();
   private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new InstanceRequestTupleSchemeFactory();
@@ -43,6 +44,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
   private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> searchSegments; // optional
   private boolean enableTrace; // optional
   private @org.apache.thrift.annotation.Nullable java.lang.String brokerId; // optional
+  private @org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> optionalSegments; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -50,7 +52,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
     QUERY((short)2, "query"),
     SEARCH_SEGMENTS((short)3, "searchSegments"),
     ENABLE_TRACE((short)4, "enableTrace"),
-    BROKER_ID((short)5, "brokerId");
+    BROKER_ID((short)5, "brokerId"),
+    OPTIONAL_SEGMENTS((short)6, "optionalSegments");
 
     private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>();
 
@@ -76,6 +79,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
           return ENABLE_TRACE;
         case 5: // BROKER_ID
           return BROKER_ID;
+        case 6: // OPTIONAL_SEGMENTS
+          return OPTIONAL_SEGMENTS;
         default:
           return null;
       }
@@ -120,7 +125,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
   private static final int __REQUESTID_ISSET_ID = 0;
   private static final int __ENABLETRACE_ISSET_ID = 1;
   private byte __isset_bitfield = 0;
-  private static final _Fields optionals[] = {_Fields.SEARCH_SEGMENTS,_Fields.ENABLE_TRACE,_Fields.BROKER_ID};
+  private static final _Fields optionals[] = {_Fields.SEARCH_SEGMENTS,_Fields.ENABLE_TRACE,_Fields.BROKER_ID,_Fields.OPTIONAL_SEGMENTS};
   public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap;
   static {
     java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class);
@@ -135,6 +140,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.BOOL)));
     tmpMap.put(_Fields.BROKER_ID, new org.apache.thrift.meta_data.FieldMetaData("brokerId", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING)));
+    tmpMap.put(_Fields.OPTIONAL_SEGMENTS, new org.apache.thrift.meta_data.FieldMetaData("optionalSegments", org.apache.thrift.TFieldRequirementType.OPTIONAL, 
+        new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST, 
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
     metaDataMap = java.util.Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InstanceRequest.class, metaDataMap);
   }
@@ -169,6 +177,10 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
     if (other.isSetBrokerId()) {
       this.brokerId = other.brokerId;
     }
+    if (other.isSetOptionalSegments()) {
+      java.util.List<java.lang.String> __this__optionalSegments = new java.util.ArrayList<java.lang.String>(other.optionalSegments);
+      this.optionalSegments = __this__optionalSegments;
+    }
   }
 
   public InstanceRequest deepCopy() {
@@ -184,6 +196,7 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
     setEnableTraceIsSet(false);
     this.enableTrace = false;
     this.brokerId = null;
+    this.optionalSegments = null;
   }
 
   public long getRequestId() {
@@ -318,6 +331,46 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
     }
   }
 
+  public int getOptionalSegmentsSize() {
+    return (this.optionalSegments == null) ? 0 : this.optionalSegments.size();
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public java.util.Iterator<java.lang.String> getOptionalSegmentsIterator() {
+    return (this.optionalSegments == null) ? null : this.optionalSegments.iterator();
+  }
+
+  public void addToOptionalSegments(java.lang.String elem) {
+    if (this.optionalSegments == null) {
+      this.optionalSegments = new java.util.ArrayList<java.lang.String>();
+    }
+    this.optionalSegments.add(elem);
+  }
+
+  @org.apache.thrift.annotation.Nullable
+  public java.util.List<java.lang.String> getOptionalSegments() {
+    return this.optionalSegments;
+  }
+
+  public void setOptionalSegments(@org.apache.thrift.annotation.Nullable java.util.List<java.lang.String> optionalSegments) {
+    this.optionalSegments = optionalSegments;
+  }
+
+  public void unsetOptionalSegments() {
+    this.optionalSegments = null;
+  }
+
+  /** Returns true if field optionalSegments is set (has been assigned a value) and false otherwise */
+  public boolean isSetOptionalSegments() {
+    return this.optionalSegments != null;
+  }
+
+  public void setOptionalSegmentsIsSet(boolean value) {
+    if (!value) {
+      this.optionalSegments = null;
+    }
+  }
+
   public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) {
     switch (field) {
     case REQUEST_ID:
@@ -360,6 +413,14 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
       }
       break;
 
+    case OPTIONAL_SEGMENTS:
+      if (value == null) {
+        unsetOptionalSegments();
+      } else {
+        setOptionalSegments((java.util.List<java.lang.String>)value);
+      }
+      break;
+
     }
   }
 
@@ -381,6 +442,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
     case BROKER_ID:
       return getBrokerId();
 
+    case OPTIONAL_SEGMENTS:
+      return getOptionalSegments();
+
     }
     throw new java.lang.IllegalStateException();
   }
@@ -402,6 +466,8 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
       return isSetEnableTrace();
     case BROKER_ID:
       return isSetBrokerId();
+    case OPTIONAL_SEGMENTS:
+      return isSetOptionalSegments();
     }
     throw new java.lang.IllegalStateException();
   }
@@ -464,6 +530,15 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
         return false;
     }
 
+    boolean this_present_optionalSegments = true && this.isSetOptionalSegments();
+    boolean that_present_optionalSegments = true && that.isSetOptionalSegments();
+    if (this_present_optionalSegments || that_present_optionalSegments) {
+      if (!(this_present_optionalSegments && that_present_optionalSegments))
+        return false;
+      if (!this.optionalSegments.equals(that.optionalSegments))
+        return false;
+    }
+
     return true;
   }
 
@@ -489,6 +564,10 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
     if (isSetBrokerId())
       hashCode = hashCode * 8191 + brokerId.hashCode();
 
+    hashCode = hashCode * 8191 + ((isSetOptionalSegments()) ? 131071 : 524287);
+    if (isSetOptionalSegments())
+      hashCode = hashCode * 8191 + optionalSegments.hashCode();
+
     return hashCode;
   }
 
@@ -550,6 +629,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
         return lastComparison;
       }
     }
+    lastComparison = java.lang.Boolean.compare(isSetOptionalSegments(), other.isSetOptionalSegments());
+    if (lastComparison != 0) {
+      return lastComparison;
+    }
+    if (isSetOptionalSegments()) {
+      lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.optionalSegments, other.optionalSegments);
+      if (lastComparison != 0) {
+        return lastComparison;
+      }
+    }
     return 0;
   }
 
@@ -608,6 +697,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
       }
       first = false;
     }
+    if (isSetOptionalSegments()) {
+      if (!first) sb.append(", ");
+      sb.append("optionalSegments:");
+      if (this.optionalSegments == null) {
+        sb.append("null");
+      } else {
+        sb.append(this.optionalSegments);
+      }
+      first = false;
+    }
     sb.append(")");
     return sb.toString();
   }
@@ -715,6 +814,24 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
               org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
             }
             break;
+          case 6: // OPTIONAL_SEGMENTS
+            if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
+              {
+                org.apache.thrift.protocol.TList _list3 = iprot.readListBegin();
+                struct.optionalSegments = new java.util.ArrayList<java.lang.String>(_list3.size);
+                @org.apache.thrift.annotation.Nullable java.lang.String _elem4;
+                for (int _i5 = 0; _i5 < _list3.size; ++_i5)
+                {
+                  _elem4 = iprot.readString();
+                  struct.optionalSegments.add(_elem4);
+                }
+                iprot.readListEnd();
+              }
+              struct.setOptionalSegmentsIsSet(true);
+            } else { 
+              org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
+            }
+            break;
           default:
             org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type);
         }
@@ -741,9 +858,9 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
           oprot.writeFieldBegin(SEARCH_SEGMENTS_FIELD_DESC);
           {
             oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.searchSegments.size()));
-            for (java.lang.String _iter3 : struct.searchSegments)
+            for (java.lang.String _iter6 : struct.searchSegments)
             {
-              oprot.writeString(_iter3);
+              oprot.writeString(_iter6);
             }
             oprot.writeListEnd();
           }
@@ -762,6 +879,20 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
           oprot.writeFieldEnd();
         }
       }
+      if (struct.optionalSegments != null) {
+        if (struct.isSetOptionalSegments()) {
+          oprot.writeFieldBegin(OPTIONAL_SEGMENTS_FIELD_DESC);
+          {
+            oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING, struct.optionalSegments.size()));
+            for (java.lang.String _iter7 : struct.optionalSegments)
+            {
+              oprot.writeString(_iter7);
+            }
+            oprot.writeListEnd();
+          }
+          oprot.writeFieldEnd();
+        }
+      }
       oprot.writeFieldStop();
       oprot.writeStructEnd();
     }
@@ -791,13 +922,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
       if (struct.isSetBrokerId()) {
         optionals.set(2);
       }
-      oprot.writeBitSet(optionals, 3);
+      if (struct.isSetOptionalSegments()) {
+        optionals.set(3);
+      }
+      oprot.writeBitSet(optionals, 4);
       if (struct.isSetSearchSegments()) {
         {
           oprot.writeI32(struct.searchSegments.size());
-          for (java.lang.String _iter4 : struct.searchSegments)
+          for (java.lang.String _iter8 : struct.searchSegments)
           {
-            oprot.writeString(_iter4);
+            oprot.writeString(_iter8);
           }
         }
       }
@@ -807,6 +941,15 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
       if (struct.isSetBrokerId()) {
         oprot.writeString(struct.brokerId);
       }
+      if (struct.isSetOptionalSegments()) {
+        {
+          oprot.writeI32(struct.optionalSegments.size());
+          for (java.lang.String _iter9 : struct.optionalSegments)
+          {
+            oprot.writeString(_iter9);
+          }
+        }
+      }
     }
 
     @Override
@@ -817,16 +960,16 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
       struct.query = new BrokerRequest();
       struct.query.read(iprot);
       struct.setQueryIsSet(true);
-      java.util.BitSet incoming = iprot.readBitSet(3);
+      java.util.BitSet incoming = iprot.readBitSet(4);
       if (incoming.get(0)) {
         {
-          org.apache.thrift.protocol.TList _list5 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING);
-          struct.searchSegments = new java.util.ArrayList<java.lang.String>(_list5.size);
-          @org.apache.thrift.annotation.Nullable java.lang.String _elem6;
-          for (int _i7 = 0; _i7 < _list5.size; ++_i7)
+          org.apache.thrift.protocol.TList _list10 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING);
+          struct.searchSegments = new java.util.ArrayList<java.lang.String>(_list10.size);
+          @org.apache.thrift.annotation.Nullable java.lang.String _elem11;
+          for (int _i12 = 0; _i12 < _list10.size; ++_i12)
           {
-            _elem6 = iprot.readString();
-            struct.searchSegments.add(_elem6);
+            _elem11 = iprot.readString();
+            struct.searchSegments.add(_elem11);
           }
         }
         struct.setSearchSegmentsIsSet(true);
@@ -839,6 +982,19 @@ public class InstanceRequest implements org.apache.thrift.TBase<InstanceRequest,
         struct.brokerId = iprot.readString();
         struct.setBrokerIdIsSet(true);
       }
+      if (incoming.get(3)) {
+        {
+          org.apache.thrift.protocol.TList _list13 = iprot.readListBegin(org.apache.thrift.protocol.TType.STRING);
+          struct.optionalSegments = new java.util.ArrayList<java.lang.String>(_list13.size);
+          @org.apache.thrift.annotation.Nullable java.lang.String _elem14;
+          for (int _i15 = 0; _i15 < _list13.size; ++_i15)
+          {
+            _elem14 = iprot.readString();
+            struct.optionalSegments.add(_elem14);
+          }
+        }
+        struct.setOptionalSegmentsIsSet(true);
+      }
     }
   }
 
diff --git a/pinot-common/src/thrift/request.thrift b/pinot-common/src/thrift/request.thrift
index 384bb66107..225836da54 100644
--- a/pinot-common/src/thrift/request.thrift
+++ b/pinot-common/src/thrift/request.thrift
@@ -51,4 +51,5 @@ struct InstanceRequest {
   3: optional list<string> searchSegments;
   4: optional bool enableTrace;
   5: optional string brokerId;
+  6: optional list<string> optionalSegments;
 }
diff --git a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
index e6d88755c5..48255233a9 100644
--- a/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
+++ b/pinot-connectors/pinot-spark-common/src/main/scala/org/apache/pinot/connector/spark/common/reader/PinotServerDataFetcher.scala
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.connector.spark.common.reader
 
+import org.apache.commons.lang3.tuple.Pair
 import org.apache.helix.model.InstanceConfig
 import org.apache.pinot.common.datatable.DataTable
 import org.apache.pinot.common.metrics.BrokerMetrics
@@ -31,7 +32,7 @@ import org.apache.pinot.spi.env.PinotConfiguration
 import org.apache.pinot.spi.metrics.PinotMetricUtils
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler
 
-import java.util.{List => JList, Map => JMap}
+import java.util.{Collections, List => JList, Map => JMap}
 import scala.collection.JavaConverters._
 
 /**
@@ -92,7 +93,7 @@ private[reader] class PinotServerDataFetcher(
     dataTables.filter(_.getNumberOfRows > 0)
   }
 
-  private def createRoutingTableForRequest(): JMap[ServerInstance, JList[String]] = {
+  private def createRoutingTableForRequest(): JMap[ServerInstance, Pair[JList[String], JList[String]]] = {
     val nullZkId: String = null
     val instanceConfig = new InstanceConfig(nullZkId)
     instanceConfig.setHostName(pinotSplit.serverAndSegments.serverHost)
@@ -100,15 +101,15 @@ private[reader] class PinotServerDataFetcher(
     // TODO: support netty-sec
     val serverInstance = new ServerInstance(instanceConfig)
     Map(
-      serverInstance -> pinotSplit.serverAndSegments.segments.asJava
+      serverInstance -> Pair.of(pinotSplit.serverAndSegments.segments.asJava, List[String]().asJava)
     ).asJava
   }
 
   private def submitRequestToPinotServer(
       offlineBrokerRequest: BrokerRequest,
-      offlineRoutingTable: JMap[ServerInstance, JList[String]],
+      offlineRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]],
       realtimeBrokerRequest: BrokerRequest,
-      realtimeRoutingTable: JMap[ServerInstance, JList[String]]): AsyncQueryResponse = {
+      realtimeRoutingTable: JMap[ServerInstance, Pair[JList[String], JList[String]]]): AsyncQueryResponse = {
     logInfo(s"Sending request to ${pinotSplit.serverAndSegments.toString}")
     queryRouter.submitQuery(
       partitionId,
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
index 1f0e9844fe..2335a0fc33 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java
@@ -311,6 +311,12 @@ public abstract class BaseTableDataManager implements TableDataManager {
 
   @Override
   public List<SegmentDataManager> acquireSegments(List<String> segmentNames, List<String> missingSegments) {
+    return acquireSegments(segmentNames, null, missingSegments);
+  }
+
+  @Override
+  public List<SegmentDataManager> acquireSegments(List<String> segmentNames,
+      @Nullable List<String> optionalSegmentNames, List<String> missingSegments) {
     List<SegmentDataManager> segmentDataManagers = new ArrayList<>();
     for (String segmentName : segmentNames) {
       SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
@@ -320,6 +326,15 @@ public abstract class BaseTableDataManager implements TableDataManager {
         missingSegments.add(segmentName);
       }
     }
+    if (optionalSegmentNames != null) {
+      for (String segmentName : optionalSegmentNames) {
+        SegmentDataManager segmentDataManager = _segmentDataManagerMap.get(segmentName);
+        // Optional segments are not counted to missing segments that are reported back in query exception.
+        if (segmentDataManager != null && segmentDataManager.increaseReferenceCount()) {
+          segmentDataManagers.add(segmentDataManager);
+        }
+      }
+    }
     return segmentDataManagers;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index ced6c62fb2..b8c5383bbb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -200,10 +200,16 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     }
 
     List<String> segmentsToQuery = queryRequest.getSegmentsToQuery();
+    List<String> optionalSegments = queryRequest.getOptionalSegments();
     List<String> notAcquiredSegments = new ArrayList<>();
     List<SegmentDataManager> segmentDataManagers =
-        tableDataManager.acquireSegments(segmentsToQuery, notAcquiredSegments);
+        tableDataManager.acquireSegments(segmentsToQuery, optionalSegments, notAcquiredSegments);
     int numSegmentsAcquired = segmentDataManagers.size();
+    if (LOGGER.isDebugEnabled()) {
+      LOGGER.debug("Processing requestId: {} with segmentsToQuery: {}, optionalSegments: {} and acquiredSegments: {}",
+          requestId, segmentsToQuery, optionalSegments,
+          segmentDataManagers.stream().map(SegmentDataManager::getSegmentName).collect(Collectors.toList()));
+    }
     List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
     for (SegmentDataManager segmentDataManager : segmentDataManagers) {
       indexSegments.add(segmentDataManager.getSegment());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
index 2d72fc04ef..a9908a87b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/logger/ServerQueryLogger.java
@@ -82,7 +82,7 @@ public class ServerQueryLogger {
         getLongValue(responseMetadata, MetadataKey.NUM_ENTRIES_SCANNED_POST_FILTER.getName(), -1);
     addToTableMeter(tableNameWithType, ServerMeter.NUM_ENTRIES_SCANNED_POST_FILTER, numEntriesScannedPostFilter);
 
-    int numSegmentsQueried = request.getSegmentsToQuery().size();
+    long numSegmentsQueried = getLongValue(responseMetadata, MetadataKey.NUM_SEGMENTS_QUERIED.getName(), -1);
     addToTableMeter(tableNameWithType, ServerMeter.NUM_SEGMENTS_QUERIED, numSegmentsQueried);
 
     long numSegmentsProcessed = getLongValue(responseMetadata, MetadataKey.NUM_SEGMENTS_PROCESSED.getName(), -1);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index e444eae880..d4ce7857a5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -51,6 +51,7 @@ public class ServerQueryRequest {
   private final boolean _enableTrace;
   private final boolean _enableStreaming;
   private final List<String> _segmentsToQuery;
+  private final List<String> _optionalSegments;
   private final QueryContext _queryContext;
 
   // Request id might not be unique across brokers or for request hitting a hybrid table. To solve that we may construct
@@ -71,6 +72,7 @@ public class ServerQueryRequest {
     _enableTrace = instanceRequest.isEnableTrace();
     _enableStreaming = enableStreaming;
     _segmentsToQuery = instanceRequest.getSearchSegments();
+    _optionalSegments = instanceRequest.getOptionalSegments();
     _queryContext = getQueryContext(instanceRequest.getQuery().getPinotQuery());
     _queryId = QueryIdUtils.getQueryId(_brokerId, _requestId,
         TableNameBuilder.getTableTypeFromTableName(_queryContext.getTableName()));
@@ -88,6 +90,8 @@ public class ServerQueryRequest {
     _enableStreaming = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_STREAMING));
 
     _segmentsToQuery = serverRequest.getSegmentsList();
+    // TODO: support optional segments for GrpcQueryServer
+    _optionalSegments = null;
 
     BrokerRequest brokerRequest;
     String payloadType = metadata.getOrDefault(Request.MetadataKeys.PAYLOAD_TYPE, Request.PayloadType.SQL);
@@ -139,6 +143,10 @@ public class ServerQueryRequest {
     return _segmentsToQuery;
   }
 
+  public List<String> getOptionalSegments() {
+    return _optionalSegments;
+  }
+
   public QueryContext getQueryContext() {
     return _queryContext;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java
index 3491dcd821..ccc6aedb81 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/routing/RoutingTable.java
@@ -20,22 +20,27 @@ package org.apache.pinot.core.routing;
 
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.transport.ServerInstance;
 
 
 public class RoutingTable {
-  private final Map<ServerInstance, List<String>> _serverInstanceToSegmentsMap;
+  // Optional segments are those not online as in ExternalView but might have been online on servers already, e.g.
+  // the newly created consuming segments. Such segments were simply skipped by brokers at query routing time, but that
+  // had caused wrong query results, particularly for upsert tables. Instead, we should pass such segments to servers
+  // and let them decide how to handle them, e.g. skip them upon issues or include them for better query results.
+  private final Map<ServerInstance, Pair<List<String>, List<String>/*optional segments*/>> _serverInstanceToSegmentsMap;
   private final List<String> _unavailableSegments;
   private final int _numPrunedSegments;
 
-  public RoutingTable(Map<ServerInstance, List<String>> serverInstanceToSegmentsMap, List<String> unavailableSegments,
-      int numPrunedSegments) {
+  public RoutingTable(Map<ServerInstance, Pair<List<String>, List<String>>> serverInstanceToSegmentsMap,
+      List<String> unavailableSegments, int numPrunedSegments) {
     _serverInstanceToSegmentsMap = serverInstanceToSegmentsMap;
     _unavailableSegments = unavailableSegments;
     _numPrunedSegments = numPrunedSegments;
   }
 
-  public Map<ServerInstance, List<String>> getServerInstanceToSegmentsMap() {
+  public Map<ServerInstance, Pair<List<String>, List<String>>> getServerInstanceToSegmentsMap() {
     return _serverInstanceToSegmentsMap;
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
index e0ff080e11..086211ad5f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/transport/QueryRouter.java
@@ -25,6 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.config.NettyConfig;
 import org.apache.pinot.common.config.TlsConfig;
 import org.apache.pinot.common.datatable.DataTable;
@@ -85,9 +87,10 @@ public class QueryRouter {
   }
 
   public AsyncQueryResponse submitQuery(long requestId, String rawTableName,
-      @Nullable BrokerRequest offlineBrokerRequest, @Nullable Map<ServerInstance, List<String>> offlineRoutingTable,
-      @Nullable BrokerRequest realtimeBrokerRequest, @Nullable Map<ServerInstance, List<String>> realtimeRoutingTable,
-      long timeoutMs) {
+      @Nullable BrokerRequest offlineBrokerRequest,
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> offlineRoutingTable,
+      @Nullable BrokerRequest realtimeBrokerRequest,
+      @Nullable Map<ServerInstance, Pair<List<String>, List<String>>> realtimeRoutingTable, long timeoutMs) {
     assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
 
     // can prefer but not require TLS until all servers guaranteed to be on TLS
@@ -97,7 +100,7 @@ public class QueryRouter {
     Map<ServerRoutingInstance, InstanceRequest> requestMap = new HashMap<>();
     if (offlineBrokerRequest != null) {
       assert offlineRoutingTable != null;
-      for (Map.Entry<ServerInstance, List<String>> entry : offlineRoutingTable.entrySet()) {
+      for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> entry : offlineRoutingTable.entrySet()) {
         ServerRoutingInstance serverRoutingInstance =
             entry.getKey().toServerRoutingInstance(TableType.OFFLINE, preferTls);
         InstanceRequest instanceRequest = getInstanceRequest(requestId, offlineBrokerRequest, entry.getValue());
@@ -106,7 +109,7 @@ public class QueryRouter {
     }
     if (realtimeBrokerRequest != null) {
       assert realtimeRoutingTable != null;
-      for (Map.Entry<ServerInstance, List<String>> entry : realtimeRoutingTable.entrySet()) {
+      for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> entry : realtimeRoutingTable.entrySet()) {
         ServerRoutingInstance serverRoutingInstance =
             entry.getKey().toServerRoutingInstance(TableType.REALTIME, preferTls);
         InstanceRequest instanceRequest = getInstanceRequest(requestId, realtimeBrokerRequest, entry.getValue());
@@ -195,7 +198,8 @@ public class QueryRouter {
     _asyncQueryResponseMap.remove(requestId);
   }
 
-  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest, List<String> segments) {
+  private InstanceRequest getInstanceRequest(long requestId, BrokerRequest brokerRequest,
+      Pair<List<String>, List<String>> segments) {
     InstanceRequest instanceRequest = new InstanceRequest();
     instanceRequest.setRequestId(requestId);
     instanceRequest.setQuery(brokerRequest);
@@ -203,8 +207,14 @@ public class QueryRouter {
     if (queryOptions != null) {
       instanceRequest.setEnableTrace(Boolean.parseBoolean(queryOptions.get(CommonConstants.Broker.Request.TRACE)));
     }
-    instanceRequest.setSearchSegments(segments);
+    instanceRequest.setSearchSegments(segments.getLeft());
     instanceRequest.setBrokerId(_brokerId);
+    if (CollectionUtils.isNotEmpty(segments.getRight())) {
+      // Don't set this field, i.e. leave it as null, if there is no optional segment at all, to be more backward
+      // compatible, as there are places like in multi-stage query engine where this field is not set today when
+      // creating the InstanceRequest.
+      instanceRequest.setOptionalSegments(segments.getRight());
+    }
     return instanceRequest;
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
index 2a30eefa2f..07aa0e2c08 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/transport/QueryRoutingTest.java
@@ -23,6 +23,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.datatable.DataTable;
 import org.apache.pinot.common.datatable.DataTable.MetadataKey;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -60,8 +61,8 @@ public class QueryRoutingTest {
       SERVER_INSTANCE.toServerRoutingInstance(TableType.REALTIME, ServerInstance.RoutingType.NETTY);
   private static final BrokerRequest BROKER_REQUEST =
       CalciteSqlCompiler.compileToBrokerRequest("SELECT * FROM testTable");
-  private static final Map<ServerInstance, List<String>> ROUTING_TABLE =
-      Collections.singletonMap(SERVER_INSTANCE, Collections.emptyList());
+  private static final Map<ServerInstance, Pair<List<String>, List<String>>> ROUTING_TABLE =
+      Collections.singletonMap(SERVER_INSTANCE, Pair.of(Collections.emptyList(), Collections.emptyList()));
 
   private QueryRouter _queryRouter;
   private ServerRoutingStatsManager _serverRoutingStatsManager;
diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
index 5029321162..75164add69 100644
--- a/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
+++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/routing/WorkerManager.java
@@ -30,6 +30,7 @@ import java.util.Random;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.calcite.rel.hint.PinotHintOptions;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.core.routing.RoutingManager;
 import org.apache.pinot.core.routing.RoutingTable;
 import org.apache.pinot.core.routing.TablePartitionInfo;
@@ -145,11 +146,12 @@ public class WorkerManager {
       String tableType = routingEntry.getKey();
       RoutingTable routingTable = routingEntry.getValue();
       // for each server instance, attach all table types and their associated segment list.
-      for (Map.Entry<ServerInstance, List<String>> serverEntry : routingTable.getServerInstanceToSegmentsMap()
-          .entrySet()) {
-        serverInstanceToSegmentsMap.putIfAbsent(serverEntry.getKey(), new HashMap<>());
-        Map<String, List<String>> tableTypeToSegmentListMap = serverInstanceToSegmentsMap.get(serverEntry.getKey());
-        Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue()) == null,
+      Map<ServerInstance, Pair<List<String>, List<String>>> segmentsMap = routingTable.getServerInstanceToSegmentsMap();
+      for (Map.Entry<ServerInstance, Pair<List<String>, List<String>>> serverEntry : segmentsMap.entrySet()) {
+        Map<String, List<String>> tableTypeToSegmentListMap =
+            serverInstanceToSegmentsMap.computeIfAbsent(serverEntry.getKey(), k -> new HashMap<>());
+        // TODO: support optional segments for multi-stage engine.
+        Preconditions.checkState(tableTypeToSegmentListMap.put(tableType, serverEntry.getValue().getLeft()) == null,
             "Entry for server {} and table type: {} already exist!", serverEntry.getKey(), tableType);
       }
 
diff --git a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
index 41bf57700c..4539627a23 100644
--- a/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
+++ b/pinot-query-planner/src/test/java/org/apache/pinot/query/testutils/MockRoutingManagerFactory.java
@@ -27,6 +27,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
 import org.apache.pinot.common.config.provider.TableCache;
@@ -56,7 +57,7 @@ public class MockRoutingManagerFactory {
   private final Map<String, Schema> _schemaMap;
   private final Set<String> _hybridTables;
   private final Map<String, ServerInstance> _serverInstances;
-  private final Map<String, Map<ServerInstance, List<String>>> _tableServerSegmentsMap;
+  private final Map<String, Map<ServerInstance, Pair<List<String>, List<String>>>> _tableServerSegmentsMap;
 
   public MockRoutingManagerFactory(int... ports) {
     _tableNameMap = new HashMap<>();
@@ -87,16 +88,15 @@ public class MockRoutingManagerFactory {
   public void registerSegment(int insertToServerPort, String tableNameWithType, String segmentName) {
     ServerInstance serverInstance = _serverInstances.get(toHostname(insertToServerPort));
     _tableServerSegmentsMap.computeIfAbsent(tableNameWithType, k -> new HashMap<>())
-        .computeIfAbsent(serverInstance, k -> new ArrayList<>()).add(segmentName);
+        .computeIfAbsent(serverInstance, k -> Pair.of(new ArrayList<>(), null)).getLeft().add(segmentName);
   }
 
   public RoutingManager buildRoutingManager(@Nullable Map<String, TablePartitionInfo> partitionInfoMap) {
     Map<String, RoutingTable> routingTableMap = new HashMap<>();
-    for (Map.Entry<String, Map<ServerInstance, List<String>>> tableEntry : _tableServerSegmentsMap.entrySet()) {
-      String tableNameWithType = tableEntry.getKey();
-      RoutingTable fakeRoutingTable = new RoutingTable(tableEntry.getValue(), Collections.emptyList(), 0);
+    _tableServerSegmentsMap.forEach((tableNameWithType, serverSegmentsMap) -> {
+      RoutingTable fakeRoutingTable = new RoutingTable(serverSegmentsMap, Collections.emptyList(), 0);
       routingTableMap.put(tableNameWithType, fakeRoutingTable);
-    }
+    });
     return new FakeRoutingManager(routingTableMap, _hybridTables, partitionInfoMap, _serverInstances);
   }
 
diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
index c3a34cbd20..6bd3d617b3 100644
--- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
+++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/testutils/MockInstanceDataManagerFactory.java
@@ -46,6 +46,7 @@ import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 import static org.mockito.ArgumentMatchers.anyList;
+import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -150,7 +151,8 @@ public class MockInstanceDataManagerFactory {
     Map<String, SegmentDataManager> segmentDataManagerMap =
         segmentList.stream().collect(Collectors.toMap(IndexSegment::getSegmentName, ImmutableSegmentDataManager::new));
     TableDataManager tableDataManager = mock(TableDataManager.class);
-    when(tableDataManager.acquireSegments(anyList(), anyList())).thenAnswer(invocation -> {
+    // TODO: support optional segments for multi-stage engine, but for now, it's always null.
+    when(tableDataManager.acquireSegments(anyList(), eq(null), anyList())).thenAnswer(invocation -> {
       List<String> segments = invocation.getArgument(0);
       return segments.stream().map(segmentDataManagerMap::get).collect(Collectors.toList());
     });
diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
index 2de9c586c0..0b820f50c2 100644
--- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
+++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/data/manager/TableDataManager.java
@@ -170,6 +170,11 @@ public interface TableDataManager {
    */
   List<SegmentDataManager> acquireSegments(List<String> segmentNames, List<String> missingSegments);
 
+  default List<SegmentDataManager> acquireSegments(List<String> segmentNames,
+      @Nullable List<String> optionalSegmentNames, List<String> missingSegments) {
+    return acquireSegments(segmentNames, missingSegments);
+  }
+
   /**
    * Acquires the segments with the given segment name.
    * <p>It is the caller's responsibility to return the segments by calling {@link #releaseSegment(SegmentDataManager)}.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org