You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/01 19:04:19 UTC

[incubator-pinot] branch master updated: Add IN_PARTITIONED_SUBQUERY support (#6043)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4242706  Add IN_PARTITIONED_SUBQUERY support (#6043)
4242706 is described below

commit 42427065ccceeeb497515852cb77807b07e80c5c
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Sun Nov 1 11:04:07 2020 -0800

    Add IN_PARTITIONED_SUBQUERY support (#6043)
    
    Add `IN_PARTITIONED_SUBQUERY` transform function to support `IDSET` aggregation function as the subquery on the server side. Because the subquery is solved on the server side, in order to make it work, the subquery must hit the same table as the main query, and the table must be partitioned at server level (all the segments for a partition is served by a single server).
    
    E.g. The following 2 queries can be combined into one query:
    `SELECT ID_SET(col) FROM table WHERE date = 20200901`
    `SELECT DISTINCT_COUNT(col), date FROM table WHERE IN_ID_SET(col, '<serializedIdSet>') = 1 GROUP BY date`
    ->
    `SELECT DISTINCT_COUNT(col), date FROM table WHERE IN_PARTITIONED_SUBQUERY(col, 'SELECT ID_SET(col) FROM table WHERE date = 20200901') = 1 GROUP BY date`
---
 .../query/executor/ServerQueryExecutorV1Impl.java  | 206 +++++++++++++-----
 .../query/pruner/ColumnValueSegmentPruner.java     |   6 +-
 .../core/query/pruner/DataSchemaSegmentPruner.java |   6 +-
 .../pinot/core/query/pruner/SegmentPruner.java     |  35 ++-
 .../core/query/pruner/SegmentPrunerService.java    |  12 +-
 .../query/pruner/SelectionQuerySegmentPruner.java  | 107 ++++-----
 .../core/query/pruner/ValidSegmentPruner.java      |  10 +-
 .../core/query/request/ServerQueryRequest.java     |  37 +---
 .../query/request/context/FunctionContext.java     |   7 +-
 .../core/query/request/context/QueryContext.java   |  94 ++++++--
 .../BrokerRequestToQueryContextConverter.java      |   7 +-
 .../request/context/utils/QueryContextUtils.java   |  54 -----
 .../query/pruner/ColumnValueSegmentPrunerTest.java |   5 +-
 .../pruner/SelectionQuerySegmentPrunerTest.java    | 241 ++++++++++-----------
 .../BrokerRequestToQueryContextConverterTest.java  |  39 ++--
 .../tests/BaseClusterIntegrationTestSet.java       |  21 ++
 .../tests/LLCRealtimeClusterIntegrationTest.java   |   7 +
 .../MultiNodesOfflineClusterIntegrationTest.java   |   6 +
 .../tests/OfflineClusterIntegrationTest.java       |   7 +
 19 files changed, 505 insertions(+), 402 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index ae3d3ce..194e57f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.query.executor;
 
+import com.google.common.base.Preconditions;
 import io.grpc.stub.StreamObserver;
 import java.util.ArrayList;
 import java.util.List;
@@ -26,7 +27,10 @@ import java.util.concurrent.ExecutorService;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.function.TransformFunctionType;
 import org.apache.pinot.common.metrics.ServerMeter;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.metrics.ServerQueryPhase;
@@ -43,12 +47,18 @@ import org.apache.pinot.core.indexsegment.mutable.MutableSegment;
 import org.apache.pinot.core.plan.Plan;
 import org.apache.pinot.core.plan.maker.InstancePlanMakerImplV2;
 import org.apache.pinot.core.plan.maker.PlanMaker;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.config.QueryExecutorConfig;
 import org.apache.pinot.core.query.exception.BadQueryRequestException;
 import org.apache.pinot.core.query.pruner.SegmentPrunerService;
 import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.FunctionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
+import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
+import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
 import org.apache.pinot.core.util.QueryOptions;
 import org.apache.pinot.core.util.trace.TraceContext;
@@ -60,6 +70,7 @@ import org.slf4j.LoggerFactory;
 @ThreadSafe
 public class ServerQueryExecutorV1Impl implements QueryExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(ServerQueryExecutorV1Impl.class);
+  private static final String IN_PARTITIONED_SUBQUERY = "inPartitionedSubquery";
 
   private InstanceDataManager _instanceDataManager;
   private SegmentPrunerService _segmentPrunerService;
@@ -159,27 +170,26 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
       _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.NUM_MISSING_SEGMENTS,
           numSegmentsQueried - numSegmentsAcquired);
     }
-
-    boolean enableTrace = queryRequest.isEnableTrace();
-    if (enableTrace) {
-      TraceContext.register(requestId);
+    List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsAcquired);
+    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+      indexSegments.add(segmentDataManager.getSegment());
     }
 
+    // Gather stats for realtime consuming segments
     int numConsumingSegmentsProcessed = 0;
     long minIndexTimeMs = Long.MAX_VALUE;
     long minIngestionTimeMs = Long.MAX_VALUE;
-    // gather stats for realtime consuming segments
-    for (SegmentDataManager segmentMgr : segmentDataManagers) {
-      if (segmentMgr.getSegment() instanceof MutableSegment) {
+    for (IndexSegment indexSegment : indexSegments) {
+      if (indexSegment instanceof MutableSegment) {
         numConsumingSegmentsProcessed += 1;
-        SegmentMetadata metadata = segmentMgr.getSegment().getSegmentMetadata();
-        long indexedTime = metadata.getLastIndexedTimestamp();
-        if (indexedTime != Long.MIN_VALUE && indexedTime < minIndexTimeMs) {
-          minIndexTimeMs = metadata.getLastIndexedTimestamp();
+        SegmentMetadata segmentMetadata = indexSegment.getSegmentMetadata();
+        long indexTimeMs = segmentMetadata.getLastIndexedTimestamp();
+        if (indexTimeMs != Long.MIN_VALUE && indexTimeMs < minIndexTimeMs) {
+          minIndexTimeMs = indexTimeMs;
         }
-        long ingestionTime = metadata.getLatestIngestionTimestamp();
-        if (ingestionTime != Long.MIN_VALUE && ingestionTime < minIngestionTimeMs) {
-          minIngestionTimeMs = ingestionTime;
+        long ingestionTimeMs = segmentMetadata.getLatestIngestionTimestamp();
+        if (ingestionTimeMs != Long.MIN_VALUE && ingestionTimeMs < minIngestionTimeMs) {
+          minIngestionTimeMs = ingestionTimeMs;
         }
       }
     }
@@ -195,48 +205,15 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
               minConsumingFreshnessTimeMs);
     }
 
+    boolean enableTrace = queryRequest.isEnableTrace();
+    if (enableTrace) {
+      TraceContext.register(requestId);
+    }
+
     DataTable dataTable = null;
     try {
-      // Compute total docs for the table before pruning the segments
-      long numTotalDocs = 0;
-      for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-        numTotalDocs += segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
-      }
-      TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
-      segmentDataManagers = _segmentPrunerService.prune(tableDataManager, segmentDataManagers, queryRequest);
-      segmentPruneTimer.stopAndRecord();
-      int numSegmentsMatchedAfterPruning = segmentDataManagers.size();
-      LOGGER.debug("Matched {} segments after pruning", numSegmentsMatchedAfterPruning);
-      if (numSegmentsMatchedAfterPruning == 0) {
-        // Only return metadata for streaming query
-        dataTable =
-            queryRequest.isEnableStreaming() ? new DataTableImplV2() : DataTableUtils.buildEmptyDataTable(queryContext);
-        Map<String, String> metadata = dataTable.getMetadata();
-        metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs));
-        metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
-        metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
-        metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, "0");
-        metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
-        metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
-      } else {
-        TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
-        List<IndexSegment> indexSegments = new ArrayList<>(numSegmentsMatchedAfterPruning);
-        for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-          indexSegments.add(segmentDataManager.getSegment());
-        }
-        long endTimeMs = queryArrivalTimeMs + queryTimeoutMs;
-        Plan globalQueryPlan = queryRequest.isEnableStreaming() ? _planMaker
-            .makeStreamingInstancePlan(indexSegments, queryContext, executorService, responseObserver, endTimeMs)
-            : _planMaker.makeInstancePlan(indexSegments, queryContext, executorService, endTimeMs);
-        planBuildTimer.stopAndRecord();
-
-        TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
-        dataTable = globalQueryPlan.execute();
-        planExecTimer.stopAndRecord();
-
-        // Update the total docs in the metadata based on un-pruned segments.
-        dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, Long.toString(numTotalDocs));
-      }
+      dataTable = processQuery(indexSegments, queryContext, timerContext, executorService, responseObserver,
+          queryArrivalTimeMs + queryTimeoutMs, queryRequest.isEnableStreaming());
     } catch (Exception e) {
       _serverMetrics.addMeteredTableValue(tableNameWithType, ServerMeter.QUERY_EXECUTION_EXCEPTIONS, 1);
 
@@ -277,4 +254,125 @@ public class ServerQueryExecutorV1Impl implements QueryExecutor {
     LOGGER.debug("InstanceResponse for request Id - {}: {}", requestId, dataTable);
     return dataTable;
   }
+
+  private DataTable processQuery(List<IndexSegment> indexSegments, QueryContext queryContext, TimerContext timerContext,
+      ExecutorService executorService, @Nullable StreamObserver<Server.ServerResponse> responseObserver, long endTimeMs,
+      boolean enableStreaming)
+      throws Exception {
+    handleSubquery(queryContext, indexSegments, timerContext, executorService, endTimeMs);
+
+    // Compute total docs for the table before pruning the segments
+    long numTotalDocs = 0;
+    for (IndexSegment indexSegment : indexSegments) {
+      numTotalDocs += indexSegment.getSegmentMetadata().getTotalDocs();
+    }
+
+    TimerContext.Timer segmentPruneTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.SEGMENT_PRUNING);
+    List<IndexSegment> selectedSegments = _segmentPrunerService.prune(indexSegments, queryContext);
+    segmentPruneTimer.stopAndRecord();
+    int numSelectedSegments = selectedSegments.size();
+    LOGGER.debug("Matched {} segments after pruning", numSelectedSegments);
+    if (numSelectedSegments == 0) {
+      // Only return metadata for streaming query
+      DataTable dataTable = enableStreaming ? new DataTableImplV2() : DataTableUtils.buildEmptyDataTable(queryContext);
+      Map<String, String> metadata = dataTable.getMetadata();
+      metadata.put(DataTable.TOTAL_DOCS_METADATA_KEY, String.valueOf(numTotalDocs));
+      metadata.put(DataTable.NUM_DOCS_SCANNED_METADATA_KEY, "0");
+      metadata.put(DataTable.NUM_ENTRIES_SCANNED_IN_FILTER_METADATA_KEY, "0");
+      metadata.put(DataTable.NUM_ENTRIES_SCANNED_POST_FILTER_METADATA_KEY, "0");
+      metadata.put(DataTable.NUM_SEGMENTS_PROCESSED, "0");
+      metadata.put(DataTable.NUM_SEGMENTS_MATCHED, "0");
+      return dataTable;
+    } else {
+      TimerContext.Timer planBuildTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.BUILD_QUERY_PLAN);
+      Plan queryPlan = enableStreaming ? _planMaker
+          .makeStreamingInstancePlan(selectedSegments, queryContext, executorService, responseObserver, endTimeMs)
+          : _planMaker.makeInstancePlan(selectedSegments, queryContext, executorService, endTimeMs);
+      planBuildTimer.stopAndRecord();
+
+      TimerContext.Timer planExecTimer = timerContext.startNewPhaseTimer(ServerQueryPhase.QUERY_PLAN_EXECUTION);
+      DataTable dataTable = queryPlan.execute();
+      planExecTimer.stopAndRecord();
+
+      // Update the total docs in the metadata based on the un-pruned segments
+      dataTable.getMetadata().put(DataTable.TOTAL_DOCS_METADATA_KEY, Long.toString(numTotalDocs));
+
+      return dataTable;
+    }
+  }
+
+  /**
+   * Handles the subquery in the given query.
+   * <p>Currently only supports subquery within the filter.
+   */
+  private void handleSubquery(QueryContext queryContext, List<IndexSegment> indexSegments, TimerContext timerContext,
+      ExecutorService executorService, long endTimeMs)
+      throws Exception {
+    FilterContext filter = queryContext.getFilter();
+    if (filter != null) {
+      handleSubquery(filter, indexSegments, timerContext, executorService, endTimeMs);
+    }
+  }
+
+  /**
+   * Handles the subquery in the given filter.
+   * <p>Currently only supports subquery within the lhs of the predicate.
+   */
+  private void handleSubquery(FilterContext filter, List<IndexSegment> indexSegments, TimerContext timerContext,
+      ExecutorService executorService, long endTimeMs)
+      throws Exception {
+    List<FilterContext> children = filter.getChildren();
+    if (children != null) {
+      for (FilterContext child : children) {
+        handleSubquery(child, indexSegments, timerContext, executorService, endTimeMs);
+      }
+    } else {
+      handleSubquery(filter.getPredicate().getLhs(), indexSegments, timerContext, executorService, endTimeMs);
+    }
+  }
+
+  /**
+   * Handles the subquery in the given expression.
+   * <p>When subquery is detected, first executes the subquery on the given segments and gets the response, then
+   * rewrites the expression with the subquery response.
+   * <p>Currently only supports ID_SET subquery within the IN_PARTITIONED_SUBQUERY transform function, which will be
+   * rewritten to an IN_ID_SET transform function.
+   */
+  private void handleSubquery(ExpressionContext expression, List<IndexSegment> indexSegments, TimerContext timerContext,
+      ExecutorService executorService, long endTimeMs)
+      throws Exception {
+    FunctionContext function = expression.getFunction();
+    if (function == null) {
+      return;
+    }
+    List<ExpressionContext> arguments = function.getArguments();
+    if (StringUtils.remove(function.getFunctionName(), '_').equalsIgnoreCase(IN_PARTITIONED_SUBQUERY)) {
+      Preconditions
+          .checkState(arguments.size() == 2, "IN_PARTITIONED_SUBQUERY requires 2 arguments: expression, subquery");
+      ExpressionContext subqueryExpression = arguments.get(1);
+      Preconditions.checkState(subqueryExpression.getType() == ExpressionContext.Type.LITERAL,
+          "Second argument of IN_PARTITIONED_SUBQUERY must be a literal (subquery)");
+      QueryContext subquery = QueryContextConverterUtils.getQueryContextFromSQL(subqueryExpression.getLiteral());
+      // Subquery should be an ID_SET aggregation only query
+      //noinspection rawtypes
+      AggregationFunction[] aggregationFunctions = subquery.getAggregationFunctions();
+      Preconditions.checkState(aggregationFunctions != null && aggregationFunctions.length == 1
+              && aggregationFunctions[0].getType() == AggregationFunctionType.IDSET
+              && subquery.getGroupByExpressions() == null,
+          "Subquery in IN_PARTITIONED_SUBQUERY should be an ID_SET aggregation only query, found: %s",
+          subqueryExpression.getLiteral());
+      // Execute the subquery
+      DataTable dataTable =
+          processQuery(indexSegments, subquery, timerContext, executorService, null, endTimeMs, false);
+      IdSet idSet = dataTable.getObject(0, 0);
+      String serializedIdSet = idSet.toBase64String();
+      // Rewrite the expression
+      function.setFunctionName(TransformFunctionType.INIDSET.name());
+      arguments.set(1, ExpressionContext.forLiteral(serializedIdSet));
+    } else {
+      for (ExpressionContext argument : arguments) {
+        handleSubquery(argument, indexSegments, timerContext, executorService, endTimeMs);
+      }
+    }
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
index 98eb36e..98d9624 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPruner.java
@@ -26,9 +26,9 @@ import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.data.partition.PartitionFunction;
 import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.query.exception.BadQueryRequestException;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.predicate.EqPredicate;
 import org.apache.pinot.core.query.request.context.predicate.Predicate;
 import org.apache.pinot.core.query.request.context.predicate.RangePredicate;
@@ -65,8 +65,8 @@ public class ColumnValueSegmentPruner implements SegmentPruner {
   }
 
   @Override
-  public boolean prune(IndexSegment segment, ServerQueryRequest queryRequest) {
-    FilterContext filter = queryRequest.getQueryContext().getFilter();
+  public boolean prune(IndexSegment segment, QueryContext query) {
+    FilterContext filter = query.getFilter();
     if (filter == null) {
       return false;
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/DataSchemaSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/DataSchemaSegmentPruner.java
index 773b145..109951b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/DataSchemaSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/DataSchemaSegmentPruner.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.query.pruner;
 
 import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -34,8 +34,8 @@ public class DataSchemaSegmentPruner implements SegmentPruner {
   }
 
   @Override
-  public boolean prune(IndexSegment segment, ServerQueryRequest queryRequest) {
-    return !segment.getColumnNames().containsAll(queryRequest.getAllColumns());
+  public boolean prune(IndexSegment segment, QueryContext query) {
+    return !segment.getColumnNames().containsAll(query.getColumns());
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
index 4319e43..124e106 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPruner.java
@@ -19,12 +19,9 @@
 package org.apache.pinot.core.query.pruner;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
 
@@ -36,31 +33,27 @@ public interface SegmentPruner {
   void init(PinotConfiguration config);
 
   /**
-   * Prunes the segments based on the query request, returns the segments that are not pruned. The pruned segments need
-   * to be released by calling {@link TableDataManager#releaseSegment(SegmentDataManager)}.
-   * <p>Override this method or {@link #prune(IndexSegment, ServerQueryRequest)} for the pruner logic.
+   * Prunes the segments based on the query, returns the segments that are not pruned.
+   * <p>Override this method or {@link #prune(IndexSegment, QueryContext)} for the pruner logic.
    */
-  default List<SegmentDataManager> prune(TableDataManager tableDataManager,
-      List<SegmentDataManager> segmentDataManagers, ServerQueryRequest queryRequest) {
-    if (segmentDataManagers.isEmpty()) {
-      return Collections.emptyList();
+  default List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) {
+    if (segments.isEmpty()) {
+      return segments;
     }
-    List<SegmentDataManager> remainingSegmentDataManagers = new ArrayList<>(segmentDataManagers.size());
-    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
-      if (prune(segmentDataManager.getSegment(), queryRequest)) {
-        tableDataManager.releaseSegment(segmentDataManager);
-      } else {
-        remainingSegmentDataManagers.add(segmentDataManager);
+    List<IndexSegment> selectedSegments = new ArrayList<>(segments.size());
+    for (IndexSegment segment : segments) {
+      if (!prune(segment, query)) {
+        selectedSegments.add(segment);
       }
     }
-    return remainingSegmentDataManagers;
+    return selectedSegments;
   }
 
   /**
-   * Returns {@code true} if the segment can be pruned based on the query request.
-   * <p>Override this method or {@link #prune(TableDataManager, List, ServerQueryRequest)} for the pruner logic.
+   * Returns {@code true} if the segment can be pruned based on the query.
+   * <p>Override this method or {@link #prune(List, QueryContext)} for the pruner logic.
    */
-  default boolean prune(IndexSegment segment, ServerQueryRequest queryRequest) {
+  default boolean prune(IndexSegment segment, QueryContext query) {
     throw new UnsupportedOperationException();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
index 4c522e9..be9f092 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SegmentPrunerService.java
@@ -20,10 +20,9 @@ package org.apache.pinot.core.query.pruner;
 
 import java.util.ArrayList;
 import java.util.List;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.TableDataManager;
+import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.query.config.SegmentPrunerConfig;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -50,11 +49,10 @@ public class SegmentPrunerService {
   /**
    * Prunes the segments based on the query request, returns the segments that are not pruned.
    */
-  public List<SegmentDataManager> prune(TableDataManager tableDataManager, List<SegmentDataManager> segmentDataManagers,
-      ServerQueryRequest queryRequest) {
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) {
     for (SegmentPruner segmentPruner : _segmentPruners) {
-      segmentDataManagers = segmentPruner.prune(tableDataManager, segmentDataManagers, queryRequest);
+      segments = segmentPruner.prune(segments, query);
     }
-    return segmentDataManagers;
+    return segments;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
index 183ac64..c3dfc50 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
@@ -23,9 +23,7 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import org.apache.pinot.core.common.DataSourceMetadata;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.TableDataManager;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.FilterContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
@@ -53,43 +51,37 @@ public class SelectionQuerySegmentPruner implements SegmentPruner {
   }
 
   @Override
-  public List<SegmentDataManager> prune(TableDataManager tableDataManager, List<SegmentDataManager> segmentDataManagers,
-      ServerQueryRequest queryRequest) {
-    int numSegments = segmentDataManagers.size();
-    if (numSegments == 0) {
-      return Collections.emptyList();
+  public List<IndexSegment> prune(List<IndexSegment> segments, QueryContext query) {
+    if (segments.isEmpty()) {
+      return segments;
     }
 
     // Do not prune aggregation queries
-    QueryContext queryContext = queryRequest.getQueryContext();
-    if (QueryContextUtils.isAggregationQuery(queryContext)) {
-      return segmentDataManagers;
+    if (QueryContextUtils.isAggregationQuery(query)) {
+      return segments;
     }
 
     // For LIMIT 0 case, keep one segment to create the schema
-    int limit = queryContext.getLimit();
+    int limit = query.getLimit();
     if (limit == 0) {
-      for (int i = 1; i < numSegments; i++) {
-        tableDataManager.releaseSegment(segmentDataManagers.get(i));
-      }
-      return Collections.singletonList(segmentDataManagers.get(0));
+      return Collections.singletonList(segments.get(0));
     }
 
     // If LIMIT is not 0, only prune segments for selection queries without filter
-    FilterContext filter = queryContext.getFilter();
+    FilterContext filter = query.getFilter();
     if (filter != null) {
-      return segmentDataManagers;
+      return segments;
     }
 
     // Skip pruning segments for upsert table because valid doc index is equivalent to a filter
-    if (segmentDataManagers.get(0).getSegment().getValidDocIndex() != null) {
-      return segmentDataManagers;
+    if (segments.get(0).getValidDocIndex() != null) {
+      return segments;
     }
 
-    if (queryContext.getOrderByExpressions() == null) {
-      return pruneSelectionOnly(tableDataManager, segmentDataManagers, queryContext);
+    if (query.getOrderByExpressions() == null) {
+      return pruneSelectionOnly(segments, query);
     } else {
-      return pruneSelectionOrderBy(tableDataManager, segmentDataManagers, queryContext);
+      return pruneSelectionOrderBy(segments, query);
     }
   }
 
@@ -97,19 +89,18 @@ public class SelectionQuerySegmentPruner implements SegmentPruner {
    * Helper method to prune segments for selection only queries without filter.
    * <p>We just need to keep enough documents to fulfill the LIMIT requirement.
    */
-  private List<SegmentDataManager> pruneSelectionOnly(TableDataManager tableDataManager,
-      List<SegmentDataManager> segmentDataManagers, QueryContext queryContext) {
-    List<SegmentDataManager> selectedSegmentDataManagers = new ArrayList<>(segmentDataManagers.size());
-    int remainingDocs = queryContext.getLimit();
-    for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+  private List<IndexSegment> pruneSelectionOnly(List<IndexSegment> segments, QueryContext query) {
+    List<IndexSegment> selectedSegments = new ArrayList<>(segments.size());
+    int remainingDocs = query.getLimit();
+    for (IndexSegment segment : segments) {
       if (remainingDocs > 0) {
-        selectedSegmentDataManagers.add(segmentDataManager);
-        remainingDocs -= segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
+        selectedSegments.add(segment);
+        remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
       } else {
-        tableDataManager.releaseSegment(segmentDataManager);
+        break;
       }
     }
-    return selectedSegmentDataManagers;
+    return selectedSegments;
   }
 
   /**
@@ -122,66 +113,62 @@ public class SelectionQuerySegmentPruner implements SegmentPruner {
    *   <li>3. Keep the segments that has value overlap with the selected ones; remove the others</li>
    * </ul>
    */
-  private List<SegmentDataManager> pruneSelectionOrderBy(TableDataManager tableDataManager,
-      List<SegmentDataManager> segmentDataManagers, QueryContext queryContext) {
-    List<OrderByExpressionContext> orderByExpressions = queryContext.getOrderByExpressions();
+  private List<IndexSegment> pruneSelectionOrderBy(List<IndexSegment> segments, QueryContext query) {
+    List<OrderByExpressionContext> orderByExpressions = query.getOrderByExpressions();
     assert orderByExpressions != null;
     int numOrderByExpressions = orderByExpressions.size();
     assert numOrderByExpressions > 0;
     OrderByExpressionContext firstOrderByExpression = orderByExpressions.get(0);
     if (firstOrderByExpression.getExpression().getType() != ExpressionContext.Type.IDENTIFIER) {
-      return segmentDataManagers;
+      return segments;
     }
     String firstOrderByColumn = firstOrderByExpression.getExpression().getIdentifier();
 
     // Extract the column min/max value from each segment
-    int numSegments = segmentDataManagers.size();
-    List<SegmentDataManager> selectedSegmentDataManagers = new ArrayList<>(numSegments);
+    int numSegments = segments.size();
+    List<IndexSegment> selectedSegments = new ArrayList<>(numSegments);
     List<MinMaxValue> minMaxValues = new ArrayList<>(numSegments);
     for (int i = 0; i < numSegments; i++) {
-      SegmentDataManager segmentDataManager = segmentDataManagers.get(i);
-      DataSourceMetadata dataSourceMetadata =
-          segmentDataManager.getSegment().getDataSource(firstOrderByColumn).getDataSourceMetadata();
+      IndexSegment segment = segments.get(i);
+      DataSourceMetadata dataSourceMetadata = segment.getDataSource(firstOrderByColumn).getDataSourceMetadata();
       Comparable minValue = dataSourceMetadata.getMinValue();
       Comparable maxValue = dataSourceMetadata.getMaxValue();
       // Always keep the segment if it does not have column min/max value in the metadata
       if (minValue == null || maxValue == null) {
-        selectedSegmentDataManagers.add(segmentDataManager);
+        selectedSegments.add(segment);
       } else {
         minMaxValues.add(new MinMaxValue(i, minValue, maxValue));
       }
     }
     if (minMaxValues.isEmpty()) {
-      return segmentDataManagers;
+      return segments;
     }
 
-    int remainingDocs = queryContext.getLimit() + queryContext.getOffset();
+    int remainingDocs = query.getLimit() + query.getOffset();
     if (firstOrderByExpression.isAsc()) {
       // For ascending order, sort on column max value in ascending order
       try {
         minMaxValues.sort(Comparator.comparing(o -> o._maxValue));
       } catch (Exception e) {
         // Skip the pruning when segments have different data types for the first order-by column
-        return segmentDataManagers;
+        return segments;
       }
 
       // Maintain the max value for all the selected segments
       Comparable maxValue = null;
       for (MinMaxValue minMaxValue : minMaxValues) {
-        SegmentDataManager segmentDataManager = segmentDataManagers.get(minMaxValue._index);
+        IndexSegment segment = segments.get(minMaxValue._index);
         if (remainingDocs > 0) {
-          selectedSegmentDataManagers.add(segmentDataManager);
-          remainingDocs -= segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
+          selectedSegments.add(segment);
+          remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
           maxValue = minMaxValue._maxValue;
         } else {
           // After getting enough documents, prune all the segments with min value larger than the current max value, or
           // min value equal to the current max value and there is only one order-by expression
           assert maxValue != null;
           int result = minMaxValue._minValue.compareTo(maxValue);
-          if (result > 0 || (result == 0 && numOrderByExpressions == 1)) {
-            tableDataManager.releaseSegment(segmentDataManager);
-          } else {
-            selectedSegmentDataManagers.add(segmentDataManager);
+          if (result < 0 || (result == 0 && numOrderByExpressions != 1)) {
+            selectedSegments.add(segment);
           }
         }
       }
@@ -191,32 +178,30 @@ public class SelectionQuerySegmentPruner implements SegmentPruner {
         minMaxValues.sort((o1, o2) -> o2._minValue.compareTo(o1._minValue));
       } catch (Exception e) {
         // Skip the pruning when segments have different data types for the first order-by column
-        return segmentDataManagers;
+        return segments;
       }
 
       // Maintain the min value for all the selected segments
       Comparable minValue = null;
       for (MinMaxValue minMaxValue : minMaxValues) {
-        SegmentDataManager segmentDataManager = segmentDataManagers.get(minMaxValue._index);
+        IndexSegment segment = segments.get(minMaxValue._index);
         if (remainingDocs > 0) {
-          selectedSegmentDataManagers.add(segmentDataManager);
-          remainingDocs -= segmentDataManager.getSegment().getSegmentMetadata().getTotalDocs();
+          selectedSegments.add(segment);
+          remainingDocs -= segment.getSegmentMetadata().getTotalDocs();
           minValue = minMaxValue._minValue;
         } else {
           // After getting enough documents, prune all the segments with max value smaller than the current min value,
           // or max value equal to the current min value and there is only one order-by expression
           assert minValue != null;
           int result = minMaxValue._maxValue.compareTo(minValue);
-          if (result < 0 || (result == 0 && numOrderByExpressions == 1)) {
-            tableDataManager.releaseSegment(segmentDataManager);
-          } else {
-            selectedSegmentDataManagers.add(segmentDataManager);
+          if (result > 0 || (result == 0 && numOrderByExpressions != 1)) {
+            selectedSegments.add(segment);
           }
         }
       }
     }
 
-    return selectedSegmentDataManagers;
+    return selectedSegments;
   }
 
   private static class MinMaxValue {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValidSegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValidSegmentPruner.java
index 34ea833..2ff04b3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValidSegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/ValidSegmentPruner.java
@@ -19,7 +19,7 @@
 package org.apache.pinot.core.query.pruner;
 
 import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
+import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
 import org.apache.pinot.spi.env.PinotConfiguration;
 
@@ -29,22 +29,18 @@ import org.apache.pinot.spi.env.PinotConfiguration;
  * invalid/bad data.
  */
 public class ValidSegmentPruner implements SegmentPruner {
+
   @Override
   public void init(PinotConfiguration config) {
-
   }
 
   /**
    * Returns true if a segment should be pruned-out due to bad/invalid data.
    * Current check(s) below:
    * - Empty segment.
-   *
-   * @param segment
-   * @param queryRequest
-   * @return
    */
   @Override
-  public boolean prune(IndexSegment segment, ServerQueryRequest queryRequest) {
+  public boolean prune(IndexSegment segment, QueryContext query) {
     SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
 
     // Check for empty segment.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
index 24f2fa2..404b516 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/ServerQueryRequest.java
@@ -20,7 +20,6 @@ package org.apache.pinot.core.query.request;
 
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.pinot.common.metrics.ServerMetrics;
 import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.common.request.BrokerRequest;
@@ -29,7 +28,6 @@ import org.apache.pinot.common.utils.CommonConstants.Query.Request;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.TimerContext;
 import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
-import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.apache.thrift.TDeserializer;
 import org.apache.thrift.protocol.TCompactProtocol;
@@ -48,29 +46,20 @@ public class ServerQueryRequest {
   private final String _brokerId;
   private final boolean _enableTrace;
   private final boolean _enableStreaming;
-  private final String _tableNameWithType;
   private final List<String> _segmentsToQuery;
+  private final QueryContext _queryContext;
 
   // Timing information for different phases of query execution
   private final TimerContext _timerContext;
 
-  // Pre-computed segment independent information
-  private final QueryContext _queryContext;
-  private final Set<String> _allColumns;
-
   public ServerQueryRequest(InstanceRequest instanceRequest, ServerMetrics serverMetrics, long queryArrivalTimeMs) {
     _requestId = instanceRequest.getRequestId();
     _brokerId = instanceRequest.getBrokerId() != null ? instanceRequest.getBrokerId() : "unknown";
     _enableTrace = instanceRequest.isEnableTrace();
     _enableStreaming = false;
-    BrokerRequest brokerRequest = instanceRequest.getQuery();
-    _tableNameWithType = brokerRequest.getQuerySource().getTableName();
     _segmentsToQuery = instanceRequest.getSearchSegments();
-    _timerContext = new TimerContext(_tableNameWithType, serverMetrics, queryArrivalTimeMs);
-
-    // Pre-compute segment independent information
-    _queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest);
-    _allColumns = QueryContextUtils.getAllColumns(_queryContext);
+    _queryContext = BrokerRequestToQueryContextConverter.convert(instanceRequest.getQuery());
+    _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs);
   }
 
   public ServerQueryRequest(Server.ServerRequest serverRequest, ServerMetrics serverMetrics)
@@ -83,6 +72,8 @@ public class ServerQueryRequest {
     _enableTrace = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_TRACE));
     _enableStreaming = Boolean.parseBoolean(metadata.get(Request.MetadataKeys.ENABLE_STREAMING));
 
+    _segmentsToQuery = serverRequest.getSegmentsList();
+
     BrokerRequest brokerRequest;
     String payloadType = metadata.getOrDefault(Request.MetadataKeys.PAYLOAD_TYPE, Request.PayloadType.SQL);
     if (payloadType.equalsIgnoreCase(Request.PayloadType.SQL)) {
@@ -94,14 +85,8 @@ public class ServerQueryRequest {
     } else {
       throw new UnsupportedOperationException("Unsupported payloadType: " + payloadType);
     }
-
-    _tableNameWithType = brokerRequest.getQuerySource().getTableName();
-    _segmentsToQuery = serverRequest.getSegmentsList();
-    _timerContext = new TimerContext(_tableNameWithType, serverMetrics, queryArrivalTimeMs);
-
-    // Pre-compute segment independent information
     _queryContext = BrokerRequestToQueryContextConverter.convert(brokerRequest);
-    _allColumns = QueryContextUtils.getAllColumns(_queryContext);
+    _timerContext = new TimerContext(_queryContext.getTableName(), serverMetrics, queryArrivalTimeMs);
   }
 
   public long getRequestId() {
@@ -121,22 +106,18 @@ public class ServerQueryRequest {
   }
 
   public String getTableNameWithType() {
-    return _tableNameWithType;
+    return _queryContext.getTableName();
   }
 
   public List<String> getSegmentsToQuery() {
     return _segmentsToQuery;
   }
 
-  public TimerContext getTimerContext() {
-    return _timerContext;
-  }
-
   public QueryContext getQueryContext() {
     return _queryContext;
   }
 
-  public Set<String> getAllColumns() {
-    return _allColumns;
+  public TimerContext getTimerContext() {
+    return _timerContext;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/FunctionContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/FunctionContext.java
index 2837305..543b941 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/FunctionContext.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/FunctionContext.java
@@ -33,7 +33,7 @@ public class FunctionContext {
   }
 
   private final Type _type;
-  private final String _functionName;
+  private String _functionName;
   private final List<ExpressionContext> _arguments;
 
   public FunctionContext(Type type, String functionName, List<ExpressionContext> arguments) {
@@ -47,6 +47,11 @@ public class FunctionContext {
     return _type;
   }
 
+  public void setFunctionName(String functionName) {
+    // NOTE: Standardize the function name to lower case
+    _functionName = functionName.toLowerCase();
+  }
+
   public String getFunctionName() {
     return _functionName;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 35e9bee..4ae3677 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -21,8 +21,10 @@ package org.apache.pinot.core.query.request.context;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -55,8 +57,9 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFacto
  *   </li>
  * </ul>
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class QueryContext {
+  private final String _tableName;
   private final List<ExpressionContext> _selectExpressions;
   private final Map<ExpressionContext, String> _aliasMap;
   private final FilterContext _filter;
@@ -72,15 +75,18 @@ public class QueryContext {
   // TODO: Remove it once the whole query engine is using the QueryContext
   private final BrokerRequest _brokerRequest;
 
-  // Pre-generate the aggregation functions for the query so that it can be shared among all the segments
+  // Pre-calculate the aggregation functions and columns for the query so that it can be shared among all the segments
   private AggregationFunction[] _aggregationFunctions;
   private Map<FunctionContext, Integer> _aggregationFunctionIndexMap;
+  private Set<String> _columns;
 
-  private QueryContext(List<ExpressionContext> selectExpressions, Map<ExpressionContext, String> aliasMap,
-      @Nullable FilterContext filter, @Nullable List<ExpressionContext> groupByExpressions,
-      @Nullable FilterContext havingFilter, @Nullable List<OrderByExpressionContext> orderByExpressions, int limit,
-      int offset, @Nullable Map<String, String> queryOptions, @Nullable Map<String, String> debugOptions,
+  private QueryContext(String tableName, List<ExpressionContext> selectExpressions,
+      Map<ExpressionContext, String> aliasMap, @Nullable FilterContext filter,
+      @Nullable List<ExpressionContext> groupByExpressions, @Nullable FilterContext havingFilter,
+      @Nullable List<OrderByExpressionContext> orderByExpressions, int limit, int offset,
+      @Nullable Map<String, String> queryOptions, @Nullable Map<String, String> debugOptions,
       BrokerRequest brokerRequest) {
+    _tableName = tableName;
     _selectExpressions = selectExpressions;
     _aliasMap = Collections.unmodifiableMap(aliasMap);
     _filter = filter;
@@ -95,6 +101,13 @@ public class QueryContext {
   }
 
   /**
+   * Returns the table name.
+   */
+  public String getTableName() {
+    return _tableName;
+  }
+
+  /**
    * Returns a list of expressions in the SELECT clause.
    */
   public List<ExpressionContext> getSelectExpressions() {
@@ -195,18 +208,26 @@ public class QueryContext {
   }
 
   /**
+   * Returns the columns (IDENTIFIER expressions) in the query.
+   */
+  public Set<String> getColumns() {
+    return _columns;
+  }
+
+  /**
    * NOTE: For debugging only.
    */
   @Override
   public String toString() {
-    return "QueryContext{" + "_selectExpressions=" + _selectExpressions + ", _aliasMap=" + _aliasMap + ", _filter="
-        + _filter + ", _groupByExpressions=" + _groupByExpressions + ", _havingFilter=" + _havingFilter
-        + ", _orderByExpressions=" + _orderByExpressions + ", _limit=" + _limit + ", _offset=" + _offset
-        + ", _queryOptions=" + _queryOptions + ", _debugOptions=" + _debugOptions + ", _brokerRequest=" + _brokerRequest
-        + '}';
+    return "QueryContext{" + "_tableName='" + _tableName + '\'' + ", _selectExpressions=" + _selectExpressions
+        + ", _aliasMap=" + _aliasMap + ", _filter=" + _filter + ", _groupByExpressions=" + _groupByExpressions
+        + ", _havingFilter=" + _havingFilter + ", _orderByExpressions=" + _orderByExpressions + ", _limit=" + _limit
+        + ", _offset=" + _offset + ", _queryOptions=" + _queryOptions + ", _debugOptions=" + _debugOptions
+        + ", _brokerRequest=" + _brokerRequest + '}';
   }
 
   public static class Builder {
+    private String _tableName;
     private List<ExpressionContext> _selectExpressions;
     private Map<ExpressionContext, String> _aliasMap;
     private FilterContext _filter;
@@ -219,6 +240,11 @@ public class QueryContext {
     private Map<String, String> _debugOptions;
     private BrokerRequest _brokerRequest;
 
+    public Builder setTableName(String tableName) {
+      _tableName = tableName;
+      return this;
+    }
+
     public Builder setSelectExpressions(List<ExpressionContext> selectExpressions) {
       _selectExpressions = selectExpressions;
       return this;
@@ -278,11 +304,12 @@ public class QueryContext {
       // TODO: Add validation logic here
 
       QueryContext queryContext =
-          new QueryContext(_selectExpressions, _aliasMap, _filter, _groupByExpressions, _havingFilter,
+          new QueryContext(_tableName, _selectExpressions, _aliasMap, _filter, _groupByExpressions, _havingFilter,
               _orderByExpressions, _limit, _offset, _queryOptions, _debugOptions, _brokerRequest);
 
-      // Pre-generate the aggregation functions for the query
+      // Pre-calculate the aggregation functions and columns for the query
       generateAggregationFunctions(queryContext);
+      extractColumns(queryContext);
 
       return queryContext;
     }
@@ -372,5 +399,46 @@ public class QueryContext {
         getAggregations(filter.getPredicate().getLhs(), aggregations);
       }
     }
+
+    /**
+     * Helper method to extract the columns (IDENTIFIER expressions) for the query.
+     */
+    private void extractColumns(QueryContext query) {
+      Set<String> columns = new HashSet<>();
+
+      for (ExpressionContext expression : query._selectExpressions) {
+        expression.getColumns(columns);
+      }
+      if (query._filter != null) {
+        query._filter.getColumns(columns);
+      }
+      if (query._groupByExpressions != null) {
+        for (ExpressionContext expression : query._groupByExpressions) {
+          expression.getColumns(columns);
+        }
+      }
+      if (query._havingFilter != null) {
+        query._havingFilter.getColumns(columns);
+      }
+      if (query._orderByExpressions != null) {
+        for (OrderByExpressionContext orderByExpression : query._orderByExpressions) {
+          orderByExpression.getColumns(columns);
+        }
+      }
+
+      // NOTE: Also gather columns from the input expressions of the aggregation functions because for certain types of
+      //       aggregation (e.g. DistinctCountThetaSketch), some input expressions are compiled while constructing the
+      //       aggregation function.
+      if (query._aggregationFunctions != null) {
+        for (AggregationFunction aggregationFunction : query._aggregationFunctions) {
+          List<ExpressionContext> inputExpressions = aggregationFunction.getInputExpressions();
+          for (ExpressionContext expression : inputExpressions) {
+            expression.getColumns(columns);
+          }
+        }
+      }
+
+      query._columns = columns;
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
index af57353..92a0d44 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverter.java
@@ -57,6 +57,7 @@ public class BrokerRequestToQueryContextConverter {
    *          optimizes the BrokerRequest but not the PinotQuery.
    */
   public static QueryContext convert(BrokerRequest brokerRequest) {
+    String tableName = brokerRequest.getQuerySource().getTableName();
     PinotQuery pinotQuery = brokerRequest.getPinotQuery();
 
     List<ExpressionContext> selectExpressions;
@@ -203,9 +204,9 @@ public class BrokerRequestToQueryContextConverter {
       }
     }
 
-    return new QueryContext.Builder().setSelectExpressions(selectExpressions).setAliasMap(aliasMap).setFilter(filter)
-        .setGroupByExpressions(groupByExpressions).setOrderByExpressions(orderByExpressions)
-        .setHavingFilter(havingFilter).setLimit(limit).setOffset(offset)
+    return new QueryContext.Builder().setTableName(tableName).setSelectExpressions(selectExpressions)
+        .setAliasMap(aliasMap).setFilter(filter).setGroupByExpressions(groupByExpressions)
+        .setOrderByExpressions(orderByExpressions).setHavingFilter(havingFilter).setLimit(limit).setOffset(offset)
         .setQueryOptions(brokerRequest.getQueryOptions()).setDebugOptions(brokerRequest.getDebugOptions())
         .setBrokerRequest(brokerRequest).build();
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
index ee1dc44..f3ff7ec 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
@@ -18,13 +18,6 @@
  */
 package org.apache.pinot.core.query.request.context.utils;
 
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.request.context.ExpressionContext;
-import org.apache.pinot.core.query.request.context.FilterContext;
-import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
 
@@ -33,53 +26,6 @@ public class QueryContextUtils {
   }
 
   /**
-   * Returns all the columns (IDENTIFIER expressions) in the given query.
-   */
-  @SuppressWarnings({"rawtypes", "unchecked"})
-  public static Set<String> getAllColumns(QueryContext query) {
-    Set<String> columns = new HashSet<>();
-
-    for (ExpressionContext expression : query.getSelectExpressions()) {
-      expression.getColumns(columns);
-    }
-    FilterContext filter = query.getFilter();
-    if (filter != null) {
-      filter.getColumns(columns);
-    }
-    List<ExpressionContext> groupByExpressions = query.getGroupByExpressions();
-    if (groupByExpressions != null) {
-      for (ExpressionContext expression : groupByExpressions) {
-        expression.getColumns(columns);
-      }
-    }
-    FilterContext havingFilter = query.getHavingFilter();
-    if (havingFilter != null) {
-      havingFilter.getColumns(columns);
-    }
-    List<OrderByExpressionContext> orderByExpressions = query.getOrderByExpressions();
-    if (orderByExpressions != null) {
-      for (OrderByExpressionContext orderByExpression : orderByExpressions) {
-        orderByExpression.getColumns(columns);
-      }
-    }
-
-    // NOTE: Also gather columns from the input expressions of the aggregation functions because for certain types of
-    //       aggregation (e.g. DistinctCountThetaSketch), some input expressions are compiled while constructing the
-    //       aggregation function.
-    AggregationFunction[] aggregationFunctions = query.getAggregationFunctions();
-    if (aggregationFunctions != null) {
-      for (AggregationFunction aggregationFunction : aggregationFunctions) {
-        List<ExpressionContext> inputExpressions = aggregationFunction.getInputExpressions();
-        for (ExpressionContext expression : inputExpressions) {
-          expression.getColumns(columns);
-        }
-      }
-    }
-
-    return columns;
-  }
-
-  /**
    * Returns {@code true} if the given query is an aggregation query, {@code false} otherwise.
    */
   public static boolean isAggregationQuery(QueryContext query) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
index 738c746..85cebb1 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/ColumnValueSegmentPrunerTest.java
@@ -23,7 +23,6 @@ import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.common.DataSourceMetadata;
 import org.apache.pinot.core.data.partition.PartitionFunctionFactory;
 import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
@@ -116,8 +115,6 @@ public class ColumnValueSegmentPrunerTest {
 
   private boolean runPruner(IndexSegment indexSegment, String query) {
     QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromPQL(query);
-    ServerQueryRequest queryRequest = mock(ServerQueryRequest.class);
-    when(queryRequest.getQueryContext()).thenReturn(queryContext);
-    return PRUNER.prune(indexSegment, queryRequest);
+    return PRUNER.prune(indexSegment, queryContext);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
index ae1d6c7..f629fa9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPrunerTest.java
@@ -23,10 +23,7 @@ import java.util.List;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.DataSource;
 import org.apache.pinot.core.common.DataSourceMetadata;
-import org.apache.pinot.core.data.manager.SegmentDataManager;
-import org.apache.pinot.core.data.manager.TableDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
-import org.apache.pinot.core.query.request.ServerQueryRequest;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.segment.index.metadata.SegmentMetadata;
@@ -44,193 +41,185 @@ public class SelectionQuerySegmentPrunerTest {
   public static final String ORDER_BY_COLUMN = "testColumn";
 
   private final SelectionQuerySegmentPruner _segmentPruner = new SelectionQuerySegmentPruner();
-  private final TableDataManager _tableDataManager = mock(TableDataManager.class);
 
   @Test
   public void testLimit0() {
-    List<SegmentDataManager> segmentDataManagers = Arrays
-        .asList(getSegmentDataManager(null, null, 10), getSegmentDataManager(0L, 10L, 10),
-            getSegmentDataManager(-5L, 5L, 15));
+    List<IndexSegment> indexSegments =
+        Arrays.asList(getIndexSegment(null, null, 10), getIndexSegment(0L, 10L, 10), getIndexSegment(-5L, 5L, 15));
 
     // Should keep only the first segment
-    ServerQueryRequest queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 0");
-    List<SegmentDataManager> result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable LIMIT 0");
+    List<IndexSegment> result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 1);
-    assertSame(result.get(0), segmentDataManagers.get(0));
+    assertSame(result.get(0), indexSegments.get(0));
 
-    queryRequest = getQueryRequest("SELECT * FROM testTable ORDER BY testColumn LIMIT 0");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext =
+        QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn LIMIT 0");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 1);
-    assertSame(result.get(0), segmentDataManagers.get(0));
-  }
-
-  @Test
-  public void testSelectionOnlyUpsert() {
-    List<SegmentDataManager> segmentDataManagers = Arrays
-        .asList(getSegmentDataManager(null, null, 10, true), getSegmentDataManager(0L, 10L, 10, true),
-            getSegmentDataManager(-5L, 5L, 15, true));
-
-    // Should keep enough documents to fulfill the LIMIT requirement
-    ServerQueryRequest queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 5");
-    List<SegmentDataManager> result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
-    assertEquals(result.size(), 3);
-
-    queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 10");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
-    assertEquals(result.size(), 3);
-
-    queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 15");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
-    assertEquals(result.size(), 3);
+    assertSame(result.get(0), indexSegments.get(0));
   }
 
   @Test
   public void testSelectionOnly() {
-    List<SegmentDataManager> segmentDataManagers = Arrays
-        .asList(getSegmentDataManager(null, null, 10), getSegmentDataManager(0L, 10L, 10),
-            getSegmentDataManager(-5L, 5L, 15));
+    List<IndexSegment> indexSegments =
+        Arrays.asList(getIndexSegment(null, null, 10), getIndexSegment(0L, 10L, 10), getIndexSegment(-5L, 5L, 15));
 
     // Should keep enough documents to fulfill the LIMIT requirement
-    ServerQueryRequest queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 5");
-    List<SegmentDataManager> result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable LIMIT 5");
+    List<IndexSegment> result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 1);
-    assertSame(result.get(0), segmentDataManagers.get(0));
+    assertSame(result.get(0), indexSegments.get(0));
 
-    queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 10");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable LIMIT 10");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 1);
-    assertSame(result.get(0), segmentDataManagers.get(0));
+    assertSame(result.get(0), indexSegments.get(0));
 
-    queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 15");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable LIMIT 15");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 2);
-    assertSame(result.get(0), segmentDataManagers.get(0));
-    assertSame(result.get(1), segmentDataManagers.get(1));
+    assertSame(result.get(0), indexSegments.get(0));
+    assertSame(result.get(1), indexSegments.get(1));
 
-    queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 25");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable LIMIT 25");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 3);
-    assertSame(result.get(0), segmentDataManagers.get(0));
-    assertSame(result.get(1), segmentDataManagers.get(1));
-    assertSame(result.get(2), segmentDataManagers.get(2));
+    assertSame(result.get(0), indexSegments.get(0));
+    assertSame(result.get(1), indexSegments.get(1));
+    assertSame(result.get(2), indexSegments.get(2));
 
-    queryRequest = getQueryRequest("SELECT * FROM testTable LIMIT 100");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext = QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable LIMIT 100");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 3);
-    assertSame(result.get(0), segmentDataManagers.get(0));
-    assertSame(result.get(1), segmentDataManagers.get(1));
-    assertSame(result.get(2), segmentDataManagers.get(2));
+    assertSame(result.get(0), indexSegments.get(0));
+    assertSame(result.get(1), indexSegments.get(1));
+    assertSame(result.get(2), indexSegments.get(2));
   }
 
   @Test
   public void testSelectionOrderBy() {
-    List<SegmentDataManager> segmentDataManagers = Arrays.asList( //
-        getSegmentDataManager(0L, 10L, 10),     // 0
-        getSegmentDataManager(-5L, 5L, 15),     // 1
-        getSegmentDataManager(15L, 50L, 30),    // 2
-        getSegmentDataManager(5L, 15L, 20),     // 3
-        getSegmentDataManager(20L, 30L, 5),     // 4
-        getSegmentDataManager(null, null, 5),   // 5
-        getSegmentDataManager(5L, 10L, 10),     // 6
-        getSegmentDataManager(15L, 30L, 15));   // 7
+    List<IndexSegment> indexSegments = Arrays.asList( //
+        getIndexSegment(0L, 10L, 10),     // 0
+        getIndexSegment(-5L, 5L, 15),     // 1
+        getIndexSegment(15L, 50L, 30),    // 2
+        getIndexSegment(5L, 15L, 20),     // 3
+        getIndexSegment(20L, 30L, 5),     // 4
+        getIndexSegment(null, null, 5),   // 5
+        getIndexSegment(5L, 10L, 10),     // 6
+        getIndexSegment(15L, 30L, 15));   // 7
 
     // Should keep segments: [null, null], [-5, 5], [0, 10]
-    ServerQueryRequest queryRequest = getQueryRequest("SELECT * FROM testTable ORDER BY testColumn LIMIT 5");
-    List<SegmentDataManager> result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    QueryContext queryContext =
+        QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn LIMIT 5");
+    List<IndexSegment> result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 3);
-    assertSame(result.get(0), segmentDataManagers.get(5));  // [null, null], 5
-    assertSame(result.get(1), segmentDataManagers.get(1));  // [-5, 5], 15
-    assertSame(result.get(2), segmentDataManagers.get(0));  // [0, 10], 10
+    assertSame(result.get(0), indexSegments.get(5));  // [null, null], 5
+    assertSame(result.get(1), indexSegments.get(1));  // [-5, 5], 15
+    assertSame(result.get(2), indexSegments.get(0));  // [0, 10], 10
 
     // Should keep segments: [null, null], [-5, 5], [0, 10], [5, 10], [5, 15]
-    queryRequest = getQueryRequest("SELECT * FROM testTable ORDER BY testColumn LIMIT 15, 20");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext =
+        QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn LIMIT 15, 20");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 5);
-    assertSame(result.get(0), segmentDataManagers.get(5));  // [null, null], 5
-    assertSame(result.get(1), segmentDataManagers.get(1));  // [-5, 5], 15
+    assertSame(result.get(0), indexSegments.get(5));  // [null, null], 5
+    assertSame(result.get(1), indexSegments.get(1));  // [-5, 5], 15
     // [0, 10], 10 & [5, 10], 10
-    assertTrue(result.get(2) == segmentDataManagers.get(0) || result.get(2) == segmentDataManagers.get(6));
-    assertTrue(result.get(3) == segmentDataManagers.get(0) || result.get(3) == segmentDataManagers.get(6));
-    assertSame(result.get(4), segmentDataManagers.get(3));  // [5, 15], 20
+    assertTrue(result.get(2) == indexSegments.get(0) || result.get(2) == indexSegments.get(6));
+    assertTrue(result.get(3) == indexSegments.get(0) || result.get(3) == indexSegments.get(6));
+    assertSame(result.get(4), indexSegments.get(3));  // [5, 15], 20
 
     // Should keep segments: [null, null], [-5, 5], [0, 10], [5, 10], [5, 15], [15, 30], [15, 50]
-    queryRequest = getQueryRequest("SELECT * FROM testTable ORDER BY testColumn, foo LIMIT 40");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext =
+        QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn, foo LIMIT 40");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 7);
-    assertSame(result.get(0), segmentDataManagers.get(5));  // [null, null], 5
-    assertSame(result.get(1), segmentDataManagers.get(1));  // [-5, 5], 15
+    assertSame(result.get(0), indexSegments.get(5));  // [null, null], 5
+    assertSame(result.get(1), indexSegments.get(1));  // [-5, 5], 15
     // [0, 10], 10 & [5, 10], 10
-    assertTrue(result.get(2) == segmentDataManagers.get(0) || result.get(2) == segmentDataManagers.get(6));
-    assertTrue(result.get(3) == segmentDataManagers.get(0) || result.get(3) == segmentDataManagers.get(6));
-    assertSame(result.get(4), segmentDataManagers.get(3));  // [5, 15], 20
-    assertSame(result.get(5), segmentDataManagers.get(7));  // [15, 30], 15
-    assertSame(result.get(6), segmentDataManagers.get(2));  // [15, 50], 30
+    assertTrue(result.get(2) == indexSegments.get(0) || result.get(2) == indexSegments.get(6));
+    assertTrue(result.get(3) == indexSegments.get(0) || result.get(3) == indexSegments.get(6));
+    assertSame(result.get(4), indexSegments.get(3));  // [5, 15], 20
+    assertSame(result.get(5), indexSegments.get(7));  // [15, 30], 15
+    assertSame(result.get(6), indexSegments.get(2));  // [15, 50], 30
 
     // Should keep segments: [null, null], [20, 30], [15, 50], [15, 30]
-    queryRequest = getQueryRequest("SELECT * FROM testTable ORDER BY testColumn DESC LIMIT 5");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext =
+        QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn DESC LIMIT 5");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 4);
-    assertSame(result.get(0), segmentDataManagers.get(5));  // [null, null], 5
-    assertSame(result.get(1), segmentDataManagers.get(4));  // [20, 30], 5
+    assertSame(result.get(0), indexSegments.get(5));  // [null, null], 5
+    assertSame(result.get(1), indexSegments.get(4));  // [20, 30], 5
     // [15, 50], 30 & [15, 30], 15
-    assertTrue(result.get(2) == segmentDataManagers.get(2) || result.get(2) == segmentDataManagers.get(7));
-    assertTrue(result.get(3) == segmentDataManagers.get(2) || result.get(3) == segmentDataManagers.get(7));
+    assertTrue(result.get(2) == indexSegments.get(2) || result.get(2) == indexSegments.get(7));
+    assertTrue(result.get(3) == indexSegments.get(2) || result.get(3) == indexSegments.get(7));
 
     // Should keep segments: [null, null], [20, 30], [15, 50], [15, 30]
-    queryRequest = getQueryRequest("SELECT * FROM testTable ORDER BY testColumn DESC LIMIT 5, 30");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext = QueryContextConverterUtils
+        .getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn DESC LIMIT 5, 30");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 4);
-    assertSame(result.get(0), segmentDataManagers.get(5));  // [null, null], 5
-    assertSame(result.get(1), segmentDataManagers.get(4));  // [20, 30], 5
+    assertSame(result.get(0), indexSegments.get(5));  // [null, null], 5
+    assertSame(result.get(1), indexSegments.get(4));  // [20, 30], 5
     // [15, 50], 30 & [15, 30], 15
-    assertTrue(result.get(2) == segmentDataManagers.get(2) || result.get(2) == segmentDataManagers.get(7));
-    assertTrue(result.get(3) == segmentDataManagers.get(2) || result.get(3) == segmentDataManagers.get(7));
+    assertTrue(result.get(2) == indexSegments.get(2) || result.get(2) == indexSegments.get(7));
+    assertTrue(result.get(3) == indexSegments.get(2) || result.get(3) == indexSegments.get(7));
 
     // Should keep segments: [null, null], [20, 30], [15, 50], [15, 30], [5, 15], [5, 10], [0, 10], [-5, 5]
-    queryRequest = getQueryRequest("SELECT * FROM testTable ORDER BY testColumn DESC, foo LIMIT 60");
-    result = _segmentPruner.prune(_tableDataManager, segmentDataManagers, queryRequest);
+    queryContext = QueryContextConverterUtils
+        .getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn DESC, foo LIMIT 60");
+    result = _segmentPruner.prune(indexSegments, queryContext);
     assertEquals(result.size(), 8);
-    assertSame(result.get(0), segmentDataManagers.get(5));  // [null, null], 5
-    assertSame(result.get(1), segmentDataManagers.get(4));  // [20, 30], 5
+    assertSame(result.get(0), indexSegments.get(5));  // [null, null], 5
+    assertSame(result.get(1), indexSegments.get(4));  // [20, 30], 5
     // [15, 50], 30 & [15, 30], 15
-    assertTrue(result.get(2) == segmentDataManagers.get(2) || result.get(2) == segmentDataManagers.get(7));
-    assertTrue(result.get(3) == segmentDataManagers.get(2) || result.get(3) == segmentDataManagers.get(7));
+    assertTrue(result.get(2) == indexSegments.get(2) || result.get(2) == indexSegments.get(7));
+    assertTrue(result.get(3) == indexSegments.get(2) || result.get(3) == indexSegments.get(7));
     // [5, 15], 20 & [5, 10], 10
-    assertTrue(result.get(4) == segmentDataManagers.get(3) || result.get(4) == segmentDataManagers.get(6));
-    assertTrue(result.get(5) == segmentDataManagers.get(3) || result.get(5) == segmentDataManagers.get(6));
-    assertSame(result.get(6), segmentDataManagers.get(0));  // [0, 10], 10
-    assertSame(result.get(7), segmentDataManagers.get(1));  // [-5, 5], 15
+    assertTrue(result.get(4) == indexSegments.get(3) || result.get(4) == indexSegments.get(6));
+    assertTrue(result.get(5) == indexSegments.get(3) || result.get(5) == indexSegments.get(6));
+    assertSame(result.get(6), indexSegments.get(0));  // [0, 10], 10
+    assertSame(result.get(7), indexSegments.get(1));  // [-5, 5], 15
   }
 
-  private SegmentDataManager getSegmentDataManager(@Nullable Long minValue, @Nullable Long maxValue, int totalDocs) {
-    return getSegmentDataManager(minValue, maxValue, totalDocs, false);
+  @Test
+  public void testUpsertTable() {
+    List<IndexSegment> indexSegments = Arrays
+        .asList(getIndexSegment(0L, 10L, 10, true), getIndexSegment(20L, 30L, 10, true),
+            getIndexSegment(40L, 50L, 10, true));
+
+    // Should not prune any segment for upsert table
+    QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable LIMIT 5");
+    List<IndexSegment> result = _segmentPruner.prune(indexSegments, queryContext);
+    assertEquals(result.size(), 3);
+
+    queryContext =
+        QueryContextConverterUtils.getQueryContextFromSQL("SELECT * FROM testTable ORDER BY testColumn LIMIT 5");
+    result = _segmentPruner.prune(indexSegments, queryContext);
+    assertEquals(result.size(), 3);
   }
 
-  private SegmentDataManager getSegmentDataManager(@Nullable Long minValue, @Nullable Long maxValue, int totalDocs,
+  private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long maxValue, int totalDocs) {
+    return getIndexSegment(minValue, maxValue, totalDocs, false);
+  }
+
+  private IndexSegment getIndexSegment(@Nullable Long minValue, @Nullable Long maxValue, int totalDocs,
       boolean upsert) {
-    SegmentDataManager segmentDataManager = mock(SegmentDataManager.class);
-    IndexSegment segment = mock(IndexSegment.class);
-    when(segmentDataManager.getSegment()).thenReturn(segment);
+    IndexSegment indexSegment = mock(IndexSegment.class);
     DataSource dataSource = mock(DataSource.class);
-    when(segment.getDataSource(ORDER_BY_COLUMN)).thenReturn(dataSource);
+    when(indexSegment.getDataSource(ORDER_BY_COLUMN)).thenReturn(dataSource);
     DataSourceMetadata dataSourceMetadata = mock(DataSourceMetadata.class);
     when(dataSource.getDataSourceMetadata()).thenReturn(dataSourceMetadata);
     when(dataSourceMetadata.getMinValue()).thenReturn(minValue);
     when(dataSourceMetadata.getMaxValue()).thenReturn(maxValue);
     SegmentMetadata segmentMetadata = mock(SegmentMetadata.class);
-    when(segment.getSegmentMetadata()).thenReturn(segmentMetadata);
+    when(indexSegment.getSegmentMetadata()).thenReturn(segmentMetadata);
     when(segmentMetadata.getTotalDocs()).thenReturn(totalDocs);
     if (upsert) {
-      ValidDocIndexReader reader = mock(ValidDocIndexReader.class);
-      when(segment.getValidDocIndex()).thenReturn(reader);
+      ValidDocIndexReader validDocIndex = mock(ValidDocIndexReader.class);
+      when(indexSegment.getValidDocIndex()).thenReturn(validDocIndex);
     }
-    return segmentDataManager;
-  }
-
-  private ServerQueryRequest getQueryRequest(String sql) {
-    ServerQueryRequest queryRequest = mock(ServerQueryRequest.class);
-    QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL(sql);
-    when(queryRequest.getQueryContext()).thenReturn(queryContext);
-    return queryRequest;
+    return indexSegment;
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java
index 4f735cb..f3d1e17 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java
@@ -53,6 +53,7 @@ public class BrokerRequestToQueryContextConverterTest {
       String query = "SELECT * FROM testTable";
       QueryContext[] queryContexts = getQueryContexts(query, query);
       for (QueryContext queryContext : queryContexts) {
+        assertEquals(queryContext.getTableName(), "testTable");
         List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
         assertEquals(selectExpressions.size(), 1);
         assertEquals(selectExpressions.get(0), ExpressionContext.forIdentifier("*"));
@@ -64,7 +65,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertNull(queryContext.getHavingFilter());
         assertEquals(queryContext.getLimit(), 10);
         assertEquals(queryContext.getOffset(), 0);
-        assertTrue(QueryContextUtils.getAllColumns(queryContext).isEmpty());
+        assertTrue(queryContext.getColumns().isEmpty());
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -74,6 +75,7 @@ public class BrokerRequestToQueryContextConverterTest {
       String query = "SELECT COUNT(*) FROM testTable";
       QueryContext[] queryContexts = getQueryContexts(query, query);
       for (QueryContext queryContext : queryContexts) {
+        assertEquals(queryContext.getTableName(), "testTable");
         List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
         assertEquals(selectExpressions.size(), 1);
         assertEquals(selectExpressions.get(0), ExpressionContext.forFunction(
@@ -87,7 +89,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertNull(queryContext.getHavingFilter());
         assertEquals(queryContext.getLimit(), 10);
         assertEquals(queryContext.getOffset(), 0);
-        assertTrue(QueryContextUtils.getAllColumns(queryContext).isEmpty());
+        assertTrue(queryContext.getColumns().isEmpty());
         assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -97,6 +99,7 @@ public class BrokerRequestToQueryContextConverterTest {
       String query = "SELECT foo, bar FROM testTable ORDER BY bar ASC, foo DESC LIMIT 50, 100";
       QueryContext[] queryContexts = getQueryContexts(query, query);
       for (QueryContext queryContext : queryContexts) {
+        assertEquals(queryContext.getTableName(), "testTable");
         List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
         assertEquals(selectExpressions.size(), 2);
         assertEquals(selectExpressions.get(0), ExpressionContext.forIdentifier("foo"));
@@ -115,7 +118,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertNull(queryContext.getHavingFilter());
         assertEquals(queryContext.getLimit(), 100);
         assertEquals(queryContext.getOffset(), 50);
-        assertEquals(QueryContextUtils.getAllColumns(queryContext), new HashSet<>(Arrays.asList("foo", "bar")));
+        assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -126,6 +129,7 @@ public class BrokerRequestToQueryContextConverterTest {
       String sqlQuery = "SELECT DISTINCT foo, bar, foobar FROM testTable ORDER BY bar DESC, foo LIMIT 15";
       QueryContext[] queryContexts = getQueryContexts(pqlQuery, sqlQuery);
       for (QueryContext queryContext : queryContexts) {
+        assertEquals(queryContext.getTableName(), "testTable");
         List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
         assertEquals(selectExpressions.size(), 1);
         assertEquals(selectExpressions.get(0), ExpressionContext.forFunction(
@@ -148,8 +152,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertNull(queryContext.getHavingFilter());
         assertEquals(queryContext.getLimit(), 15);
         assertEquals(queryContext.getOffset(), 0);
-        assertEquals(QueryContextUtils.getAllColumns(queryContext),
-            new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
+        assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
         assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -160,6 +163,7 @@ public class BrokerRequestToQueryContextConverterTest {
           "SELECT ADD(foo, ADD(bar, 123)), SUB('456', foobar) FROM testTable ORDER BY SUB(456, foobar) LIMIT 30, 20";
       QueryContext[] queryContexts = getQueryContexts(query, query);
       for (QueryContext queryContext : queryContexts) {
+        assertEquals(queryContext.getTableName(), "testTable");
         List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
         assertEquals(selectExpressions.size(), 2);
         assertEquals(selectExpressions.get(0), ExpressionContext.forFunction(
@@ -185,8 +189,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertNull(queryContext.getHavingFilter());
         assertEquals(queryContext.getLimit(), 20);
         assertEquals(queryContext.getOffset(), 30);
-        assertEquals(QueryContextUtils.getAllColumns(queryContext),
-            new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
+        assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -199,6 +202,7 @@ public class BrokerRequestToQueryContextConverterTest {
           "SELECT SUB(foo, bar), bar, SUM(ADD(foo, bar)) FROM testTable GROUP BY SUB(foo, bar), bar ORDER BY SUM(ADD(foo, bar)), SUB(foo, bar) DESC LIMIT 20";
       QueryContext[] queryContexts = getQueryContexts(pqlQuery, sqlQuery);
       for (QueryContext queryContext : queryContexts) {
+        assertEquals(queryContext.getTableName(), "testTable");
         List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
         int numSelectExpressions = selectExpressions.size();
         assertTrue(numSelectExpressions == 1 || numSelectExpressions == 3);
@@ -244,7 +248,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertNull(queryContext.getHavingFilter());
         assertEquals(queryContext.getLimit(), 20);
         assertEquals(queryContext.getOffset(), 0);
-        assertEquals(QueryContextUtils.getAllColumns(queryContext), new HashSet<>(Arrays.asList("foo", "bar")));
+        assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
         assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -255,6 +259,7 @@ public class BrokerRequestToQueryContextConverterTest {
           "SELECT * FROM testTable WHERE foo > 15 AND (DIV(bar, foo) BETWEEN 10 AND 20 OR TEXT_MATCH(foobar, 'potato'))";
       QueryContext[] queryContexts = getQueryContexts(query, query);
       for (QueryContext queryContext : queryContexts) {
+        assertEquals(queryContext.getTableName(), "testTable");
         List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
         assertEquals(selectExpressions.size(), 1);
         assertEquals(selectExpressions.get(0), ExpressionContext.forIdentifier("*"));
@@ -283,8 +288,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertNull(queryContext.getHavingFilter());
         assertEquals(queryContext.getLimit(), 10);
         assertEquals(queryContext.getOffset(), 0);
-        assertEquals(QueryContextUtils.getAllColumns(queryContext),
-            new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
+        assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -295,6 +299,7 @@ public class BrokerRequestToQueryContextConverterTest {
       String sqlQuery =
           "SELECT SUM(foo) AS a, bar AS b FROM testTable WHERE b IN (5, 10, 15) GROUP BY b ORDER BY a DESC";
       QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL(sqlQuery);
+      assertEquals(queryContext.getTableName(), "testTable");
       List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
       assertEquals(selectExpressions.size(), 2);
       assertEquals(selectExpressions.get(0), ExpressionContext.forFunction(
@@ -329,7 +334,7 @@ public class BrokerRequestToQueryContextConverterTest {
       assertNull(queryContext.getHavingFilter());
       assertEquals(queryContext.getLimit(), 10);
       assertEquals(queryContext.getOffset(), 0);
-      assertEquals(QueryContextUtils.getAllColumns(queryContext), new HashSet<>(Arrays.asList("foo", "bar")));
+      assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
     }
 
@@ -337,6 +342,7 @@ public class BrokerRequestToQueryContextConverterTest {
     {
       String sqlQuery = "SELECT SUM(foo), bar FROM testTable GROUP BY bar HAVING SUM(foo) IN (5, 10, 15)";
       QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL(sqlQuery);
+      assertEquals(queryContext.getTableName(), "testTable");
       List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
       assertEquals(selectExpressions.size(), 2);
       assertEquals(selectExpressions.get(0), ExpressionContext.forFunction(
@@ -361,7 +367,7 @@ public class BrokerRequestToQueryContextConverterTest {
       assertEquals(havingFilter.toString(), "sum(foo) IN ('5','10','15')");
       assertEquals(queryContext.getLimit(), 10);
       assertEquals(queryContext.getOffset(), 0);
-      assertEquals(QueryContextUtils.getAllColumns(queryContext), new HashSet<>(Arrays.asList("foo", "bar")));
+      assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
     }
 
@@ -370,6 +376,7 @@ public class BrokerRequestToQueryContextConverterTest {
       String sqlQuery =
           "SELECT SUM(col1) * MAX(col2) FROM testTable GROUP BY col3 HAVING SUM(col1) > MIN(col2) AND SUM(col4) + col3 < MAX(col4) ORDER BY MAX(col1) + MAX(col2) - SUM(col4), col3 DESC";
       QueryContext queryContext = QueryContextConverterUtils.getQueryContextFromSQL(sqlQuery);
+      assertEquals(queryContext.getTableName(), "testTable");
 
       // SELECT clause
       List<ExpressionContext> selectExpressions = queryContext.getSelectExpressions();
@@ -428,8 +435,7 @@ public class BrokerRequestToQueryContextConverterTest {
           new FunctionContext(FunctionContext.Type.AGGREGATION, "sum",
               Collections.singletonList(ExpressionContext.forIdentifier("col4")))));
 
-      assertEquals(QueryContextUtils.getAllColumns(queryContext),
-          new HashSet<>(Arrays.asList("col1", "col2", "col3", "col4")));
+      assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("col1", "col2", "col3", "col4")));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
 
       // Expected: SUM(col1), MAX(col2), MIN(col2), SUM(col4), MAX(col4), MAX(col1)
@@ -475,8 +481,7 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(arguments.get(2), ExpressionContext.forLiteral("bar='a'"));
         assertEquals(arguments.get(3), ExpressionContext.forLiteral("bar='b'"));
         assertEquals(arguments.get(4), ExpressionContext.forLiteral("SET_INTERSECT($1, $2)"));
-        assertEquals(QueryContextUtils.getAllColumns(queryContext),
-            new HashSet<>(Arrays.asList("foo", "bar")));
+        assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
         assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
       }
     }
@@ -548,7 +553,7 @@ public class BrokerRequestToQueryContextConverterTest {
     assertNull(queryContext.getHavingFilter());
     assertEquals(queryContext.getLimit(), 100);
     assertEquals(queryContext.getOffset(), 50);
-    assertEquals(QueryContextUtils.getAllColumns(queryContext), new HashSet<>(Arrays.asList("foo", "bar")));
+    assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
     assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
   }
 
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
index 4195674..37cec5a 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTestSet.java
@@ -287,6 +287,27 @@ public abstract class BaseClusterIntegrationTestSet extends BaseClusterIntegrati
   }
 
   /**
+   * Test hardcoded queries on server partitioned data (all the segments for a partition is served by a single server).
+   */
+  public void testHardcodedServerPartitionedSqlQueries()
+      throws Exception {
+    // IN_PARTITIONED_SUBQUERY
+    {
+      String inPartitionedSubqueryQuery =
+          "SELECT COUNT(*) FROM mytable WHERE INPARTITIONEDSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 1";
+      String inQuery =
+          "SELECT COUNT(*) FROM mytable WHERE DestAirportID IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
+      testSqlQuery(inPartitionedSubqueryQuery, Collections.singletonList(inQuery));
+
+      String notInPartitionedSubqueryQuery =
+          "SELECT COUNT(*) FROM mytable WHERE INPARTITIONEDSUBQUERY(DestAirportID, 'SELECT IDSET(DestAirportID) FROM mytable WHERE DaysSinceEpoch = 16430') = 0";
+      String notInQuery =
+          "SELECT COUNT(*) FROM mytable WHERE DestAirportID NOT IN (SELECT DestAirportID FROM mytable WHERE DaysSinceEpoch = 16430)";
+      testSqlQuery(notInPartitionedSubqueryQuery, Collections.singletonList(notInQuery));
+    }
+  }
+
+  /**
    * Test to ensure that broker response contains expected stats
    *
    * @throws Exception
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
index 757e6fe..606d809 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LLCRealtimeClusterIntegrationTest.java
@@ -173,4 +173,11 @@ public class LLCRealtimeClusterIntegrationTest extends RealtimeClusterIntegratio
       throws Exception {
     testReload(false);
   }
+
+  @Test
+  @Override
+  public void testHardcodedServerPartitionedSqlQueries()
+      throws Exception {
+    super.testHardcodedServerPartitionedSqlQueries();
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
index 6f8ae91..526256d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/MultiNodesOfflineClusterIntegrationTest.java
@@ -63,4 +63,10 @@ public class MultiNodesOfflineClusterIntegrationTest extends OfflineClusterInteg
   public void testGrpcQueryServer() {
     // Ignored
   }
+
+  @Test(enabled = false)
+  @Override
+  public void testHardcodedServerPartitionedSqlQueries() {
+    // Ignored
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 0d3c3b0..a33d51d 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1453,4 +1453,11 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       }
     }
   }
+
+  @Test
+  @Override
+  public void testHardcodedServerPartitionedSqlQueries()
+      throws Exception {
+    super.testHardcodedServerPartitionedSqlQueries();
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org