You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/10/09 19:52:18 UTC

[incubator-pinot] branch master updated: First pass of GROUP BY with ORDER BY support (#4602)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9362ba4  First pass of GROUP BY with ORDER BY support (#4602)
9362ba4 is described below

commit 9362ba4d59b922b8bee30cb36e7e4fda2474b6d6
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Wed Oct 9 12:52:11 2019 -0700

    First pass of GROUP BY with ORDER BY support (#4602)
    
    This PR contains the implementation of ORDER BY support in group by.
    * In this first pass, the changes have been done from CombineGroupByOrderByOperator upwards. The AggregationGroupByOperator hasn't been changed.
    * IndexedTable is used wherever possible (to merge results in CombineGroupByOrderByOperator, and then to reduce results across servers in the BrokerReduceService)
    * ResultTable has been introduced, as a standard way to return results to the client.
    * 2 queryOptions have been introduced:
    1) groupByMode - pql/sql - whether to execute the group by in PQL style (split all aggregations and ignore order by) or standard SQL style
    2) responseFormat - pql/sql - whether to present results using List (the PQL way), or use ResultTable which is closer to the SQL way.
    By default, the modes are PQL, PQL
    In order to get the order by results in ResultTable, modes should be SQL,SQL
    In order to get the order by results, but in List, modes should be SQL,PQL
---
 .../requesthandler/BaseBrokerRequestHandler.java   |  48 +--
 .../response/broker/BrokerResponseNative.java      |  14 +-
 .../pinot/common/response/broker/ResultTable.java  |  53 ++--
 .../apache/pinot/common/utils/CommonConstants.java |   3 +
 .../apache/pinot/core/data/order/OrderByUtils.java |  65 +++-
 .../core/data/table/ConcurrentIndexedTable.java    | 169 ++++++++--
 .../apache/pinot/core/data/table/IndexedTable.java |  40 ++-
 .../java/org/apache/pinot/core/data/table/Key.java |   9 +-
 .../org/apache/pinot/core/data/table/Record.java   |   8 +
 .../pinot/core/data/table/SimpleIndexedTable.java  | 102 ++++--
 .../org/apache/pinot/core/data/table/Table.java    |  10 +-
 .../operator/CombineGroupByOrderByOperator.java    | 257 +++++++++++++++
 .../operator/blocks/IntermediateResultsBlock.java  | 102 +++++-
 .../query/AggregationGroupByOrderByOperator.java   | 133 ++++++++
 .../plan/AggregationGroupByOrderByPlanNode.java    | 129 ++++++++
 .../apache/pinot/core/plan/CombinePlanNode.java    |   9 +
 .../core/plan/maker/InstancePlanMakerImplV2.java   |  11 +-
 .../function/AggregationFunctionUtils.java         |   5 +
 .../groupby/AggregationGroupByTrimmingService.java |   3 +-
 .../groupby/DictionaryBasedGroupKeyGenerator.java  |   6 +-
 .../aggregation/groupby/GroupKeyGenerator.java     |   1 +
 .../NoDictionaryMultiColumnGroupKeyGenerator.java  |   2 +-
 .../core/query/reduce/BrokerReduceService.java     | 284 +++++++++++++++--
 .../pinot/core/query/reduce/CombineService.java    |   6 +-
 .../org/apache/pinot/core/util/GroupByUtils.java   |  56 ++++
 .../pinot/core/data/order/OrderByUtilsTest.java    |   2 +-
 .../pinot/core/data/table/IndexedTableTest.java    | 150 ++++++---
 .../org/apache/pinot/queries/BaseQueriesTest.java  |  27 ++
 ...InnerSegmentSelectionMultiValueQueriesTest.java |  16 +-
 ...nnerSegmentSelectionSingleValueQueriesTest.java |  20 +-
 .../InterSegmentOrderByMultiValueQueriesTest.java  | 120 +++++++
 .../InterSegmentOrderBySingleValueQueriesTest.java | 346 +++++++++++++++++++++
 .../org/apache/pinot/queries/QueriesTestUtils.java |  21 ++
 .../AggregationGroupByTrimmingServiceTest.java     |   8 +-
 .../groupby/NoDictionaryGroupKeyGeneratorTest.java |   2 +-
 ...exedTable.java => BenchmarkCombineGroupBy.java} | 144 +++++----
 .../apache/pinot/perf/BenchmarkIndexedTable.java   |  64 ++--
 37 files changed, 2100 insertions(+), 345 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 9d44bc4..834148d 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -55,16 +55,11 @@ import org.apache.pinot.common.request.FilterQueryMap;
 import org.apache.pinot.common.response.BrokerResponse;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Broker;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.common.utils.CommonConstants.Broker.*;
-import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.DEBUG_OPTIONS;
-import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.PQL;
-import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.SQL;
-import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.TRACE;
-
 
 @ThreadSafe
 public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
@@ -100,12 +95,13 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     _queryQuotaManager = queryQuotaManager;
     _brokerMetrics = brokerMetrics;
 
-    _brokerId = config.getString(CONFIG_OF_BROKER_ID, getDefaultBrokerId());
-    _brokerTimeoutMs = config.getLong(CONFIG_OF_BROKER_TIMEOUT_MS, DEFAULT_BROKER_TIMEOUT_MS);
-    _queryResponseLimit = config.getInt(CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
-    _queryLogLength = config.getInt(CONFIG_OF_BROKER_QUERY_LOG_LENGTH, DEFAULT_BROKER_QUERY_LOG_LENGTH);
-    _queryLogRateLimiter = RateLimiter.create(
-        config.getDouble(CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND, DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
+    _brokerId = config.getString(Broker.CONFIG_OF_BROKER_ID, getDefaultBrokerId());
+    _brokerTimeoutMs = config.getLong(Broker.CONFIG_OF_BROKER_TIMEOUT_MS, Broker.DEFAULT_BROKER_TIMEOUT_MS);
+    _queryResponseLimit =
+        config.getInt(Broker.CONFIG_OF_BROKER_QUERY_RESPONSE_LIMIT, Broker.DEFAULT_BROKER_QUERY_RESPONSE_LIMIT);
+    _queryLogLength = config.getInt(Broker.CONFIG_OF_BROKER_QUERY_LOG_LENGTH, Broker.DEFAULT_BROKER_QUERY_LOG_LENGTH);
+    _queryLogRateLimiter = RateLimiter.create(config.getDouble(Broker.CONFIG_OF_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND,
+        Broker.DEFAULT_BROKER_QUERY_LOG_MAX_RATE_PER_SECOND));
 
     _numDroppedLog = new AtomicInteger(0);
     _numDroppedLogRateLimiter = RateLimiter.create(1.0);
@@ -223,17 +219,23 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     }
 
     // Set extra settings into broker request
-    if (request.has(TRACE) && request.get(TRACE).asBoolean()) {
+    if (request.has(Broker.Request.TRACE) && request.get(Broker.Request.TRACE).asBoolean()) {
       LOGGER.debug("Enable trace for request {}: {}", requestId, query);
       brokerRequest.setEnableTrace(true);
     }
-    if (request.has(DEBUG_OPTIONS)) {
-      Map<String, String> debugOptions = Splitter.on(';').omitEmptyStrings().trimResults().withKeyValueSeparator('=')
-          .split(request.get(DEBUG_OPTIONS).asText());
+
+    if (request.has(Broker.Request.DEBUG_OPTIONS)) {
+      Map<String, String> debugOptions = getOptionsFromRequest(request, Broker.Request.DEBUG_OPTIONS);
       LOGGER.debug("Debug options are set to: {} for request {}: {}", debugOptions, requestId, query);
       brokerRequest.setDebugOptions(debugOptions);
     }
 
+    if (request.has(Broker.Request.QUERY_OPTIONS)) {
+      Map<String, String> queryOptions = getOptionsFromRequest(request, Broker.Request.QUERY_OPTIONS);
+      LOGGER.debug("Query options are set to: {} for request {}: {}", queryOptions, requestId, query);
+      brokerRequest.setQueryOptions(queryOptions);
+    }
+
     // Optimize the query
     // TODO: get time column name from schema or table config so that we can apply it for REALTIME only case
     // We get timeColumnName from time boundary service currently, which only exists for offline table
@@ -340,11 +342,19 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
     return brokerResponse;
   }
 
+  private Map<String, String> getOptionsFromRequest(JsonNode request, String optionsKey) {
+    return Splitter.on(';')
+        .omitEmptyStrings()
+        .trimResults()
+        .withKeyValueSeparator('=')
+        .split(request.get(optionsKey).asText());
+  }
+
   private PinotQueryRequest getPinotQueryRequest(JsonNode request) {
-    if (request.has(SQL)) {
-      return new PinotQueryRequest(SQL, request.get(SQL).asText());
+    if (request.has(Broker.Request.SQL)) {
+      return new PinotQueryRequest(Broker.Request.SQL, request.get(Broker.Request.SQL).asText());
     }
-    return new PinotQueryRequest(PQL, request.get(PQL).asText());
+    return new PinotQueryRequest(Broker.Request.PQL, request.get(Broker.Request.PQL).asText());
   }
 
   /**
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
index 38ae0c3..301992f 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/BrokerResponseNative.java
@@ -39,7 +39,7 @@ import org.apache.pinot.common.utils.JsonUtils;
  *
  * Supports serialization via JSON.
  */
-@JsonPropertyOrder({"selectionResults", "aggregationResults", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "segmentStatistics", "traceInfo"})
+@JsonPropertyOrder({"selectionResults", "aggregationResults", "resultTable", "exceptions", "numServersQueried", "numServersResponded", "numSegmentsQueried", "numSegmentsProcessed", "numSegmentsMatched", "numConsumingSegmentsQueried", "numDocsScanned", "numEntriesScannedInFilter", "numEntriesScannedPostFilter", "numGroupsLimitReached", "totalDocs", "timeUsedMs", "segmentStatistics", "traceInfo"})
 public class BrokerResponseNative implements BrokerResponse {
   public static final BrokerResponseNative EMPTY_RESULT = BrokerResponseNative.empty();
   public static final BrokerResponseNative NO_TABLE_RESULT =
@@ -64,6 +64,7 @@ public class BrokerResponseNative implements BrokerResponse {
 
   private SelectionResults _selectionResults;
   private List<AggregationResult> _aggregationResults;
+  private ResultTable _resultTable;
 
   private Map<String, String> _traceInfo = new HashMap<>();
   private List<QueryProcessingException> _processingExceptions = new ArrayList<>();
@@ -111,6 +112,17 @@ public class BrokerResponseNative implements BrokerResponse {
     _aggregationResults = aggregationResults;
   }
 
+  @JsonProperty("resultTable")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public ResultTable getResultTable() {
+    return _resultTable;
+  }
+
+  @JsonProperty("resultTable")
+  public void setResultTable(ResultTable resultTable) {
+    _resultTable = resultTable;
+  }
+
   @JsonProperty("exceptions")
   public List<QueryProcessingException> getProcessingExceptions() {
     return _processingExceptions;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/ResultTable.java
similarity index 50%
copy from pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java
copy to pinot-common/src/main/java/org/apache/pinot/common/response/broker/ResultTable.java
index 15d51d8..a091a3b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/response/broker/ResultTable.java
@@ -16,46 +16,39 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.data.table;
+package org.apache.pinot.common.response.broker;
 
-import java.util.Arrays;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import java.io.Serializable;
+import java.util.List;
 
 
 /**
- * Defines the key component of the record
+ * Holds the results in a standard tabular structure
+ *
+ * FIXME: Column types and Multi-value support are missing, and deserialize might not work properly
  */
-public class Key {
-  private Object[] _columns;
-
-  public Key(Object[] columns) {
+@JsonPropertyOrder({"columns", "rows"})
+public class ResultTable {
+  private final List<String> _columns;
+  private final List<Serializable[]> _rows;
+
+  @JsonCreator
+  public ResultTable(@JsonProperty("columns") List<String> columns,
+      @JsonProperty("rows") List<Serializable[]> rows) {
     _columns = columns;
+    _rows = rows;
   }
 
-  public Object[] getColumns() {
+  @JsonProperty("columns")
+  public List<String> getColumns() {
     return _columns;
   }
 
-  @Override
-  public boolean equals(Object o) {
-    Key that = (Key) o;
-    return Arrays.deepEquals(_columns, that._columns);
-  }
-
-  @Override
-  public int hashCode() {
-    return Arrays.deepHashCode(_columns);
+  @JsonProperty("results")
+  public List<Serializable[]> getRows() {
+    return _rows;
   }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("[ ");
-    for (Object s : _columns) {
-      sb.append(s);
-      sb.append(", ");
-    }
-    sb.append("]");
-    return sb.toString();
-  }
-
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index 964a376..f1bc644 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -171,9 +171,12 @@ public class CommonConstants {
       public static final String SQL = "sql";
       public static final String TRACE = "trace";
       public static final String DEBUG_OPTIONS = "debugOptions";
+      public static final String QUERY_OPTIONS = "queryOptions";
 
       public static class QueryOptionKey {
         public static final String PRESERVE_TYPE = "preserveType";
+        public static final String RESPONSE_FORMAT = "responseFormat";
+        public static final String GROUP_BY_MODE = "groupByMode";
       }
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java
index ad098b1..36bdc5b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.data.order;
 
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
@@ -108,11 +109,36 @@ public final class OrderByUtils {
   }
 
   /**
+   * Gets the indices from Record which have aggregations that are present in the order by
+   * @param orderBy order by information
+   * @param aggregationInfos aggregation information
+   * @return indices of aggregations in the record
+   */
+  public static int[] getAggregationIndexes(List<SelectionSort> orderBy, List<AggregationInfo> aggregationInfos) {
+    Map<String, Integer> aggregationColumnToIndex = new HashMap<>();
+    for (int i = 0; i < aggregationInfos.size(); i++) {
+      AggregationInfo aggregationInfo = aggregationInfos.get(i);
+      String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
+      aggregationColumnToIndex.put(aggregationColumn, i);
+    }
+
+    List<Integer> indexes = new ArrayList<>();
+    for (SelectionSort orderByInfo : orderBy) {
+      String column = orderByInfo.getColumn();
+
+      if (aggregationColumnToIndex.containsKey(column)) {
+        indexes.add(aggregationColumnToIndex.get(column));
+      }
+    }
+    return indexes.stream().mapToInt(i->i).toArray();
+  }
+
+  /**
    * Constructs the comparator for ordering by a combination of keys from {@link Record::_keys}
    * and aggregation values from {@link Record::values}
    */
   public static Comparator<Record> getKeysAndValuesComparator(DataSchema dataSchema, List<SelectionSort> orderBy,
-      List<AggregationInfo> aggregationInfos) {
+      List<AggregationInfo> aggregationInfos, boolean extractFinalResults) {
 
     int numKeys = dataSchema.size() - aggregationInfos.size();
     Map<String, Integer> keyIndexMap = new HashMap<>();
@@ -126,9 +152,7 @@ public final class OrderByUtils {
     Map<String, AggregationInfo> aggregationColumnToInfo = new HashMap<>(aggregationInfos.size());
     for (int i = 0; i < aggregationInfos.size(); i++) {
       AggregationInfo aggregationInfo = aggregationInfos.get(i);
-      String aggregationColumn =
-          aggregationInfo.getAggregationType().toLowerCase() + "(" + AggregationFunctionUtils.getColumn(aggregationInfo)
-              + ")";
+      String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
       aggregationColumnToIndex.put(aggregationColumn, i);
       aggregationColumnToInfo.put(aggregationColumn, aggregationInfo);
     }
@@ -141,16 +165,18 @@ public final class OrderByUtils {
       String column = orderByInfo.getColumn();
       boolean ascending = orderByInfo.isIsAsc();
 
+      // TODO: avoid the index computation and index lookup in the comparison.
+      // we can achieve this by making order by operate on Object[], which contains only order by fields
       if (keyIndexMap.containsKey(column)) {
         int index = keyIndexMap.get(column);
         ColumnDataType columnDataType = keyColumnDataTypeMap.get(column);
-        comparator = OrderByUtils.getKeysComparator(ascending, index, columnDataType);
+        comparator = getKeysComparator(ascending, index, columnDataType);
       } else if (aggregationColumnToIndex.containsKey(column)) {
         int index = aggregationColumnToIndex.get(column);
         AggregationFunction aggregationFunction =
             AggregationFunctionUtils.getAggregationFunctionContext(aggregationColumnToInfo.get(column))
                 .getAggregationFunction();
-        comparator = getAggregationComparator(ascending, index, aggregationFunction);
+        comparator = getAggregationComparator(ascending, index, aggregationFunction, extractFinalResults);
       } else {
         throw new UnsupportedOperationException("Could not find column " + column + " in data schema");
       }
@@ -275,18 +301,27 @@ public final class OrderByUtils {
   }
 
   private static Comparator<Record> getAggregationComparator(boolean ascending, int index,
-      AggregationFunction aggregationFunction) {
+      AggregationFunction aggregationFunction, boolean extractFinalResults) {
 
     Comparator<Record> comparator;
-    // TODO: extract the final results and cache them, so that we do it n times and avoid doing it nlogn times
-    if (ascending) {
-      comparator = (v1, v2) -> ComparableComparator.getInstance()
-          .compare(aggregationFunction.extractFinalResult(v1.getValues()[index]),
-              aggregationFunction.extractFinalResult(v2.getValues()[index]));
+    if (extractFinalResults) {
+      if (ascending) {
+        comparator = (v1, v2) -> ComparableComparator.getInstance()
+            .compare(aggregationFunction.extractFinalResult(v1.getValues()[index]),
+                aggregationFunction.extractFinalResult(v2.getValues()[index]));
+      } else {
+        comparator = (v1, v2) -> ComparableComparator.getInstance()
+            .compare(aggregationFunction.extractFinalResult(v2.getValues()[index]),
+                aggregationFunction.extractFinalResult(v1.getValues()[index]));
+      }
     } else {
-      comparator = (v1, v2) -> ComparableComparator.getInstance()
-          .compare(aggregationFunction.extractFinalResult(v2.getValues()[index]),
-              aggregationFunction.extractFinalResult(v1.getValues()[index]));
+      if (ascending) {
+        comparator =
+            (v1, v2) -> ComparableComparator.getInstance().compare(v1.getValues()[index], v2.getValues()[index]);
+      } else {
+        comparator =
+            (v1, v2) -> ComparableComparator.getInstance().compare(v2.getValues()[index], v1.getValues()[index]);
+      }
     }
     return comparator;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index cf3c233..d1ea28f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -19,18 +19,25 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
+import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.annotation.Nonnull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.order.OrderByUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -38,18 +45,45 @@ import org.apache.pinot.core.data.order.OrderByUtils;
  */
 public class ConcurrentIndexedTable extends IndexedTable {
 
+  private static final Logger LOGGER = LoggerFactory.getLogger(ConcurrentIndexedTable.class);
+
   private ConcurrentMap<Key, Record> _lookupMap;
-  private Comparator<Record> _minHeapComparator;
   private ReentrantReadWriteLock _readWriteLock;
 
+  private boolean _isOrderBy;
+  private Comparator<Record> _resizeOrderByComparator;
+  private Comparator<Record> _finishOrderByComparator;
+  private int[] _aggregationIndexes;
+  private Iterator<Record> _iterator;
+
+  private AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
+  private final AtomicInteger _numResizes = new AtomicInteger();
+  private final AtomicLong _resizeTime = new AtomicLong();
+
+  /**
+   * Initializes the data structures and comparators needed for this Table
+   * @param dataSchema data schema of the record's keys and values
+   * @param aggregationInfos aggregation infos for the aggregations in record's values
+   * @param orderBy list of {@link SelectionSort} defining the order by
+   * @param capacity the max number of records to hold
+   */
   @Override
   public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
-      int maxCapacity) {
-    super.init(dataSchema, aggregationInfos, orderBy, maxCapacity);
+      int capacity) {
+    super.init(dataSchema, aggregationInfos, orderBy, capacity);
 
-    _minHeapComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos).reversed();
     _lookupMap = new ConcurrentHashMap<>();
     _readWriteLock = new ReentrantReadWriteLock();
+    _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
+    if (_isOrderBy) {
+      // get indices of aggregations to extract final results upfront
+      // FIXME: at instance level, extract final results only if intermediate result is non-comparable, instead of for all aggregations
+      _aggregationIndexes = OrderByUtils.getAggregationIndexes(orderBy, aggregationInfos);
+      // resize comparator doesn't need to extract final results, because it will be done before hand when adding Records to the PQ.
+      _resizeOrderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, false);
+      // finish comparator needs to extract final results, as it cannot be done before hand. The _lookupMap will get modified if it is done before hand
+      _finishOrderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
+    }
   }
 
   /**
@@ -61,25 +95,47 @@ public class ConcurrentIndexedTable extends IndexedTable {
     Key key = newRecord.getKey();
     Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
 
-    Record existingRecord = _lookupMap.putIfAbsent(key, newRecord);
-    if (existingRecord != null) {
-      _lookupMap.compute(key, (k, v) -> {
-        for (int i = 0; i < _aggregationFunctions.size(); i++) {
+    if (_noMoreNewRecords.get()) { // allow only existing record updates
+      _lookupMap.computeIfPresent(key, (k, v) -> {
+        for (int i = 0; i < _numAggregations; i++) {
           v.getValues()[i] = _aggregationFunctions.get(i).merge(v.getValues()[i], newRecord.getValues()[i]);
         }
         return v;
       });
-    }
+    } else { // allow all records
 
-    // resize if exceeds capacity
-    if (_lookupMap.size() >= _bufferedCapacity) {
-      _readWriteLock.writeLock().lock();
+      _readWriteLock.readLock().lock();
       try {
-        if (_lookupMap.size() >= _bufferedCapacity) {
-          resize(_evictCapacity);
-        }
+        _lookupMap.compute(key, (k, v) -> {
+          if (v == null) {
+            return newRecord;
+          } else {
+            for (int i = 0; i < _numAggregations; i++) {
+              v.getValues()[i] = _aggregationFunctions.get(i).merge(v.getValues()[i], newRecord.getValues()[i]);
+            }
+            return v;
+          }
+        });
       } finally {
-        _readWriteLock.writeLock().unlock();
+        _readWriteLock.readLock().unlock();
+      }
+
+      // resize if exceeds capacity
+      if (_lookupMap.size() >= _bufferedCapacity) {
+        if (_isOrderBy) {
+          // reached capacity, resize
+          _readWriteLock.writeLock().lock();
+          try {
+            if (_lookupMap.size() >= _bufferedCapacity) {
+              resize(_maxCapacity);
+            }
+          } finally {
+            _readWriteLock.writeLock().unlock();
+          }
+        } else {
+          // reached capacity and no order by. No more new records will be accepted
+          _noMoreNewRecords.set(true);
+        }
       }
     }
     return true;
@@ -101,37 +157,88 @@ public class ConcurrentIndexedTable extends IndexedTable {
 
   @Override
   public Iterator<Record> iterator() {
-    return _lookupMap.values().iterator();
+    return _iterator;
   }
 
   private void resize(int trimToSize) {
 
     if (_lookupMap.size() > trimToSize) {
+      long startTime = System.currentTimeMillis();
 
-      // make min heap of elements to evict
-      int heapSize = _lookupMap.size() - trimToSize;
-      PriorityQueue<Record> minHeap = new PriorityQueue<>(heapSize, _minHeapComparator);
+      if (_isOrderBy) {
+        // drop bottom
 
-      for (Record record : _lookupMap.values()) {
-        if (minHeap.size() < heapSize) {
-          minHeap.offer(record);
-        } else {
-          Record peek = minHeap.peek();
-          if (minHeap.comparator().compare(record, peek) < 0) {
-            minHeap.poll();
+        // make min heap of elements to evict
+        int heapSize = _lookupMap.size() - trimToSize;
+        PriorityQueue<Record> minHeap = new PriorityQueue<>(heapSize, _resizeOrderByComparator);
+
+        for (Record record : _lookupMap.values()) {
+
+          // extract final results before hand for comparisons on aggregations
+          // FIXME: at instance level, extract final results only if intermediate result is non-comparable, instead of for all aggregations
+          if (_aggregationIndexes.length > 0) {
+            Object[] values = record.getValues();
+            for (int index : _aggregationIndexes) {
+              values[index] = _aggregationFunctions.get(index).extractFinalResult(values[index]);
+            }
+          }
+          if (minHeap.size() < heapSize) {
             minHeap.offer(record);
+          } else {
+            Record peek = minHeap.peek();
+            if (minHeap.comparator().compare(peek, record) < 0) {
+              minHeap.poll();
+              minHeap.offer(record);
+            }
           }
         }
-      }
 
-      for (Record evictRecord : minHeap) {
-        _lookupMap.remove(evictRecord.getKey());
+        for (Record evictRecord : minHeap) {
+          _lookupMap.remove(evictRecord.getKey());
+        }
+      } else {
+        // drop randomly
+
+        int numRecordsToDrop = _lookupMap.size() - trimToSize;
+        for (Key evictKey : _lookupMap.keySet()) {
+          _lookupMap.remove(evictKey);
+          numRecordsToDrop --;
+          if (numRecordsToDrop == 0) {
+            break;
+          }
+        }
       }
+      long endTime = System.currentTimeMillis();
+      long timeElapsed = endTime - startTime;
+
+      _numResizes.incrementAndGet();
+      _resizeTime.addAndGet(timeElapsed);
     }
   }
 
   @Override
-  public void finish() {
+  public void finish(boolean sort) {
     resize(_maxCapacity);
+    int numResizes = _numResizes.get();
+    long resizeTime = _resizeTime.get();
+    LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", numResizes, resizeTime,
+        numResizes == 0 ? 0 : resizeTime / numResizes);
+
+    _iterator = _lookupMap.values().iterator();
+    if (sort && _isOrderBy) {
+      // TODO: in this final sort, we can optimize again by extracting final results before hand.
+      // This could be done by adding another parameter to finish(sort, extractFinalResults)
+      // The caller then does not have to extract final results again, if extractFinalResults=true was passed to finish.
+      // Typically, at instance level, we will not need to sort, nor do we want final results - finish(true, true).
+      // At broker level we need to sort, and we also want final results - finish(true, true)
+      List<Record> sortedList = new ArrayList<>(_lookupMap.values());
+      sortedList.sort(_finishOrderByComparator);
+      _iterator = sortedList.iterator();
+    }
+  }
+
+  @Override
+  public DataSchema getDataSchema() {
+    return _dataSchema;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index a2b4b25..ebf0711 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -32,38 +32,44 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils
  * Base abstract implementation of Table for indexed lookup
  */
 public abstract class IndexedTable implements Table {
-  // When table reaches max capacity, we will allow 20% more records to get inserted (bufferedCapacity)
-  // If records beyond bufferedCapacity are received, the table will undergo sort and evict upto evictCapacity (10% more than capacity)
-  // This is to ensure that for a small number beyond capacity, a fair chance is given to all records which have the potential to climb up the order
-  /** Factor used to add buffer to maxCapacity of the Collection used **/
-  private static final double BUFFER_FACTOR = 1.2;
-  /** Factor used to decide eviction threshold **/
-  private static final double EVICTION_FACTOR = 1.1;
 
   List<AggregationFunction> _aggregationFunctions;
+  int _numAggregations;
   DataSchema _dataSchema;
-  List<AggregationInfo> _aggregationInfos;
-  List<SelectionSort> _orderBy;
 
   int _maxCapacity;
-  int _evictCapacity;
   int _bufferedCapacity;
 
   @Override
   public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
-      int maxCapacity) {
+      int capacity) {
     _dataSchema = dataSchema;
-    _aggregationInfos = aggregationInfos;
-    _orderBy = orderBy;
 
-    _aggregationFunctions = new ArrayList<>(aggregationInfos.size());
+    _numAggregations = aggregationInfos.size();
+    _aggregationFunctions = new ArrayList<>(_numAggregations);
     for (AggregationInfo aggregationInfo : aggregationInfos) {
       _aggregationFunctions.add(
           AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
     }
 
-    _maxCapacity = maxCapacity;
-    _bufferedCapacity = (int) (maxCapacity * BUFFER_FACTOR);
-    _evictCapacity = (int) (maxCapacity * EVICTION_FACTOR);
+    /* Factor used to add buffer to maxCapacity of the table **/
+    double bufferFactor;
+    /* Factor used to decide eviction threshold **/
+    /** The true capacity of the table is {@link IndexedTable::_bufferedCapacity},
+     * which is bufferFactor times the {@link IndexedTable::_maxCapacity}
+     *
+     * If records beyond {@link IndexedTable::_bufferedCapacity} are received,
+     * the table resize and evict bottom records, resizing it to {@link IndexedTable::_maxCapacity}
+     * The assumption here is that {@link IndexedTable::_maxCapacity} already has a buffer added by the caller (typically, we do max(top * 5, 5000))
+     */
+    if (capacity > 50000) {
+      // if max capacity is large, buffer capacity is kept smaller, so that we do not accumulate too many records for sorting/resizing
+      bufferFactor = 1.2;
+    } else {
+      // if max capacity is small, buffer capacity is kept larger, so that we avoid frequent resizing
+      bufferFactor = 2.0;
+    }
+    _maxCapacity = capacity;
+    _bufferedCapacity = (int) (capacity * bufferFactor);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java
index 15d51d8..515659a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Key.java
@@ -48,14 +48,7 @@ public class Key {
 
   @Override
   public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append("[ ");
-    for (Object s : _columns) {
-      sb.append(s);
-      sb.append(", ");
-    }
-    sb.append("]");
-    return sb.toString();
+    return Arrays.deepToString(_columns);
   }
 
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Record.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Record.java
index 7637f7e..417b2d8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Record.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Record.java
@@ -18,6 +18,9 @@
  */
 package org.apache.pinot.core.data.table;
 
+import java.util.Arrays;
+
+
 /**
  * Defines a single record in Pinot comprising of keys and values
  */
@@ -43,4 +46,9 @@ public class Record {
   public Object[] getValues() {
     return _values;
   }
+
+  @Override
+  public String toString() {
+    return _key.toString() + " " + Arrays.deepToString(_values);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index 9fb00a8..a7f4894 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -32,6 +32,8 @@ import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.order.OrderByUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 
 /**
@@ -39,17 +41,39 @@ import org.apache.pinot.core.data.order.OrderByUtils;
  */
 @NotThreadSafe
 public class SimpleIndexedTable extends IndexedTable {
+  private static final Logger LOGGER = LoggerFactory.getLogger(SimpleIndexedTable.class);
 
   private List<Record> _records;
   private Map<Key, Integer> _lookupTable;
 
+  private boolean _isOrderBy;
+  private Comparator<Record> _orderByComparator;
+  private Iterator<Record> _iterator;
+
+  private boolean _noMoreNewRecords = false;
+  private int _numResizes = 0;
+  private long _resizeTime = 0;
+
+  /**
+   * Initializes the data structures and comparators needed for this Table
+   * @param dataSchema data schema of the record's keys and values
+   * @param aggregationInfos aggregation infors for the aggregations in record'd values
+   * @param orderBy list of {@link SelectionSort} defining the order by
+   * @param capacity the max number of records to hold
+   */
   @Override
   public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
-      int maxCapacity) {
-    super.init(dataSchema, aggregationInfos, orderBy, maxCapacity);
+      int capacity) {
+    super.init(dataSchema, aggregationInfos, orderBy, capacity);
 
-    _records = new ArrayList<>(maxCapacity);
-    _lookupTable = new HashMap<>(maxCapacity);
+    _records = new ArrayList<>(capacity);
+    _lookupTable = new HashMap<>(capacity);
+
+    _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
+    if (_isOrderBy) {
+      // final results not extracted upfront
+      _orderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
+    }
   }
 
   /**
@@ -61,33 +85,45 @@ public class SimpleIndexedTable extends IndexedTable {
     Preconditions.checkNotNull(keys, "Cannot upsert record with null keys");
 
     Integer index = _lookupTable.get(keys);
-    if (index == null) {
-      index = size();
-      _lookupTable.put(keys, index);
-      _records.add(index, newRecord);
-    } else {
-      Record existingRecord = _records.get(index);
-      for (int i = 0; i < _aggregationFunctions.size(); i++) {
-        existingRecord.getValues()[i] =
-            _aggregationFunctions.get(i).merge(existingRecord.getValues()[i], newRecord.getValues()[i]);
+    if (_noMoreNewRecords) { // only update existing records
+      if (index != null) {
+        Record existingRecord = _records.get(index);
+        for (int i = 0; i < _numAggregations; i++) {
+          existingRecord.getValues()[i] = _aggregationFunctions.get(i).merge(existingRecord.getValues()[i], newRecord.getValues()[i]);
+        }
+      }
+    } else { // allow all records
+      if (index == null) {
+        index = size();
+        _lookupTable.put(keys, index);
+        _records.add(index, newRecord);
+      } else {
+        Record existingRecord = _records.get(index);
+        for (int i = 0; i < _numAggregations; i++) {
+          existingRecord.getValues()[i] = _aggregationFunctions.get(i).merge(existingRecord.getValues()[i], newRecord.getValues()[i]);
+        }
       }
-    }
 
-    if (size() >= _bufferedCapacity) {
-      resize(_evictCapacity);
+      if (size() >= _bufferedCapacity) {
+        if (_isOrderBy) { // capacity reached, order and resize
+          sortAndResize(_maxCapacity);
+        } else { // capacity reached, but no order by. Allow no more records
+          _noMoreNewRecords = true;
+        }
+      }
     }
     return true;
   }
 
-  private void resize(int trimToSize) {
+  private void sortAndResize(int trimToSize) {
+    long startTime = System.currentTimeMillis();
+
     // sort
-    if (CollectionUtils.isNotEmpty(_orderBy)) {
-      Comparator<Record> comparator;
-      comparator = OrderByUtils.getKeysAndValuesComparator(_dataSchema, _orderBy, _aggregationInfos);
-      _records.sort(comparator);
+    if (_isOrderBy) {
+      _records.sort(_orderByComparator);
     }
 
-    // evict lowest
+    // evict lowest (or whatever's at the bottom if sort didnt happen)
     if (_records.size() > trimToSize) {
       _records = new ArrayList<>(_records.subList(0, trimToSize));
     }
@@ -97,6 +133,12 @@ public class SimpleIndexedTable extends IndexedTable {
     for (int i = 0; i < _records.size(); i++) {
       _lookupTable.put(_records.get(i).getKey(), i);
     }
+
+    long endTime = System.currentTimeMillis();
+    long timeElapsed = endTime - startTime;
+
+    _numResizes++;
+    _resizeTime += timeElapsed;
   }
 
 
@@ -116,11 +158,21 @@ public class SimpleIndexedTable extends IndexedTable {
 
   @Override
   public Iterator<Record> iterator() {
-    return _records.iterator();
+    return _iterator;
+  }
+
+  @Override
+  public void finish(boolean sort) {
+    // TODO: support resize without sort
+    sortAndResize(_maxCapacity);
+    LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", _numResizes, _resizeTime,
+        _numResizes == 0 ? 0 : _resizeTime / _numResizes);
+
+    _iterator = _records.iterator();
   }
 
   @Override
-  public void finish() {
-    resize(_maxCapacity);
+  public DataSchema getDataSchema() {
+    return _dataSchema;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
index dd1e5f0..e515bdb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
@@ -38,7 +38,7 @@ public interface Table {
    * @param orderBy the order by information if applicable
    * @param maxCapacity the max capacity the table should have
    */
-  void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
+  void init(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
       int maxCapacity);
 
   /**
@@ -63,6 +63,12 @@ public interface Table {
 
   /**
    * Finish any pre exit processing
+   * @param sort sort the final results if true
    */
-  void finish();
+  void finish(boolean sort);
+
+  /**
+   * Returns the data schema of the table
+   */
+  DataSchema getDataSchema();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
new file mode 100644
index 0000000..43f5076
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
@@ -0,0 +1,257 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.Function;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.ProcessingException;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The <code>CombineGroupByOrderByOperator</code> class is the operator to combine aggregation results with group-by and order by.
+ */
+// TODO: this class has a lot of duplication with {@link CombineGroupByOperator}.
+// These 2 classes can be combined into one
+// For the first iteration of Order By support, these will be separate
+public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final Logger LOGGER = LoggerFactory.getLogger(CombineGroupByOrderByOperator.class);
+  private static final String OPERATOR_NAME = "CombineGroupByOrderByOperator";
+
+  private final List<Operator> _operators;
+  private final BrokerRequest _brokerRequest;
+  private final ExecutorService _executorService;
+  private final long _timeOutMs;
+  private final int _indexedTableCapacity;
+  private Lock _initLock;
+  private DataSchema _dataSchema;
+  private ConcurrentIndexedTable _indexedTable;
+
+  public CombineGroupByOrderByOperator(List<Operator> operators, BrokerRequest brokerRequest,
+      ExecutorService executorService, long timeOutMs) {
+    Preconditions.checkArgument(brokerRequest.isSetAggregationsInfo() && brokerRequest.isSetGroupBy());
+
+    _operators = operators;
+    _brokerRequest = brokerRequest;
+    _executorService = executorService;
+    _timeOutMs = timeOutMs;
+    _initLock = new ReentrantLock();
+    _indexedTable = new ConcurrentIndexedTable();
+    _indexedTableCapacity = 1_000_000;
+    // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this value to a higher number until we can tune the resize
+    //_indexedTableCapacity = GroupByUtils.getTableCapacity((int) brokerRequest.getGroupBy().getTopN());
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>Combines the group-by result blocks from underlying operators and returns a merged and trimmed group-by
+   * result block.
+   * <ul>
+   *   <li>
+   *     Concurrently merge group-by results from multiple result blocks into {@link org.apache.pinot.core.data.table.IndexedTable}
+   *   </li>
+   *   <li>
+   *     Set all exceptions encountered during execution into the merged result block
+   *   </li>
+   * </ul>
+   */
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    int numOperators = _operators.size();
+    CountDownLatch operatorLatch = new CountDownLatch(numOperators);
+
+    int numAggregationFunctions = _brokerRequest.getAggregationsInfoSize();
+    int numGroupBy = _brokerRequest.getGroupBy().getExpressionsSize();
+    ConcurrentLinkedQueue<ProcessingException> mergedProcessingExceptions = new ConcurrentLinkedQueue<>();
+
+    Future[] futures = new Future[numOperators];
+    for (int i = 0; i < numOperators; i++) {
+      int index = i;
+      futures[i] = _executorService.submit(new TraceRunnable() {
+        @SuppressWarnings("unchecked")
+        @Override
+        public void runJob() {
+          AggregationGroupByResult aggregationGroupByResult;
+
+          try {
+            IntermediateResultsBlock intermediateResultsBlock =
+                (IntermediateResultsBlock) _operators.get(index).nextBlock();
+
+            _initLock.lock();
+            try {
+              if (_dataSchema == null) {
+                _dataSchema = intermediateResultsBlock.getDataSchema();
+                _indexedTable.init(_dataSchema, _brokerRequest.getAggregationsInfo(), _brokerRequest.getOrderBy(),
+                    _indexedTableCapacity);
+              }
+            } finally {
+              _initLock.unlock();
+            }
+
+            // Merge processing exceptions.
+            List<ProcessingException> processingExceptionsToMerge = intermediateResultsBlock.getProcessingExceptions();
+            if (processingExceptionsToMerge != null) {
+              mergedProcessingExceptions.addAll(processingExceptionsToMerge);
+            }
+
+            // Merge aggregation group-by result.
+            aggregationGroupByResult = intermediateResultsBlock.getAggregationGroupByResult();
+            if (aggregationGroupByResult != null) {
+              // Get converter functions
+              Function[] converterFunctions = new Function[numGroupBy];
+              for (int i = 0; i < numGroupBy; i++) {
+                converterFunctions[i] = getConverterFunction(_dataSchema.getColumnDataType(i));
+              }
+
+              // Iterate over the group-by keys, for each key, update the group-by result in the indexedTable.
+              Iterator<GroupKeyGenerator.GroupKey> groupKeyIterator = aggregationGroupByResult.getGroupKeyIterator();
+              while (groupKeyIterator.hasNext()) {
+                GroupKeyGenerator.GroupKey groupKey = groupKeyIterator.next();
+                String[] stringKey = groupKey._stringKey.split(GroupKeyGenerator.DELIMITER);
+                Object[] objectKey = new Object[numGroupBy];
+                for (int i = 0; i < stringKey.length; i++) {
+                  objectKey[i] = converterFunctions[i].apply(stringKey[i]);
+                }
+                Object[] values = new Object[numAggregationFunctions];
+                for (int i = 0; i < numAggregationFunctions; i++) {
+                  values[i] = aggregationGroupByResult.getResultForKey(groupKey, i);
+                }
+
+                Record record = new Record(new Key(objectKey), values);
+                _indexedTable.upsert(record);
+              }
+            }
+          } catch (Exception e) {
+            LOGGER.error("Exception processing CombineGroupByOrderBy for index {}, operator {}", index,
+                _operators.get(index).getClass().getName(), e);
+            mergedProcessingExceptions.add(QueryException.getException(QueryException.QUERY_EXECUTION_ERROR, e));
+          }
+
+          operatorLatch.countDown();
+        }
+      });
+    }
+
+    try {
+      boolean opCompleted = operatorLatch.await(_timeOutMs, TimeUnit.MILLISECONDS);
+      if (!opCompleted) {
+        // If this happens, the broker side should already timed out, just log the error and return
+        String errorMessage = "Timed out while combining group-by results after " + _timeOutMs + "ms";
+        LOGGER.error(errorMessage);
+        return new IntermediateResultsBlock(new TimeoutException(errorMessage));
+      }
+
+      _indexedTable.finish(false);
+      IntermediateResultsBlock mergedBlock = new IntermediateResultsBlock(_indexedTable);
+
+      // Set the processing exceptions.
+      if (!mergedProcessingExceptions.isEmpty()) {
+        mergedBlock.setProcessingExceptions(new ArrayList<>(mergedProcessingExceptions));
+      }
+
+      // Set the execution statistics.
+      ExecutionStatistics executionStatistics = new ExecutionStatistics();
+      for (Operator operator : _operators) {
+        ExecutionStatistics executionStatisticsToMerge = operator.getExecutionStatistics();
+        if (executionStatisticsToMerge != null) {
+          executionStatistics.merge(executionStatisticsToMerge);
+        }
+      }
+      mergedBlock.setNumDocsScanned(executionStatistics.getNumDocsScanned());
+      mergedBlock.setNumEntriesScannedInFilter(executionStatistics.getNumEntriesScannedInFilter());
+      mergedBlock.setNumEntriesScannedPostFilter(executionStatistics.getNumEntriesScannedPostFilter());
+      mergedBlock.setNumSegmentsProcessed(executionStatistics.getNumSegmentsProcessed());
+      mergedBlock.setNumSegmentsMatched(executionStatistics.getNumSegmentsMatched());
+      mergedBlock.setNumTotalRawDocs(executionStatistics.getNumTotalRawDocs());
+
+      if (_indexedTable.size() >= _indexedTableCapacity) {
+        mergedBlock.setNumGroupsLimitReached(true);
+      }
+
+      return mergedBlock;
+    } catch (Exception e) {
+      return new IntermediateResultsBlock(e);
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+    }
+  }
+
+  private Function<String, Object> getConverterFunction(DataSchema.ColumnDataType columnDataType) {
+    Function<String, Object> function;
+    switch (columnDataType) {
+
+      case INT:
+        function = Integer::valueOf;
+        break;
+      case LONG:
+        function = Long::valueOf;
+        break;
+      case FLOAT:
+        function = Float::valueOf;
+        break;
+      case DOUBLE:
+        function = Double::valueOf;
+        break;
+      case BYTES:
+        function = BytesUtils::toBytes;
+        break;
+      case STRING:
+      default:
+        function = s -> s;
+        break;
+    }
+    return function;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index df920a3..b982006 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -18,9 +18,11 @@
  */
 package org.apache.pinot.core.operator.blocks;
 
+import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nonnull;
@@ -28,6 +30,7 @@ import javax.annotation.Nullable;
 import org.apache.pinot.common.exception.QueryException;
 import org.apache.pinot.common.response.ProcessingException;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.Block;
 import org.apache.pinot.core.common.BlockDocIdSet;
@@ -36,6 +39,8 @@ import org.apache.pinot.core.common.BlockMetadata;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableImplV2;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.Table;
 import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
@@ -45,7 +50,7 @@ import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
  * The <code>IntermediateResultsBlock</code> class is the holder of the server side inter-segment results.
  */
 public class IntermediateResultsBlock implements Block {
-  private DataSchema _selectionDataSchema;
+  private DataSchema _dataSchema;
   private Collection<Serializable[]> _selectionResult;
   private AggregationFunctionContext[] _aggregationFunctionContexts;
   private List<Object> _aggregationResult;
@@ -60,12 +65,14 @@ public class IntermediateResultsBlock implements Block {
   private long _numSegmentsMatched;
   private boolean _numGroupsLimitReached;
 
+  private Table _table;
+
   /**
    * Constructor for selection result.
    */
-  public IntermediateResultsBlock(@Nonnull DataSchema selectionDataSchema,
+  public IntermediateResultsBlock(@Nonnull DataSchema dataSchema,
       @Nonnull Collection<Serializable[]> selectionResult) {
-    _selectionDataSchema = selectionDataSchema;
+    _dataSchema = dataSchema;
     _selectionResult = selectionResult;
   }
 
@@ -95,6 +102,22 @@ public class IntermediateResultsBlock implements Block {
   }
 
   /**
+   * Constructor for aggregation group-by order-by result with {@link AggregationGroupByResult}.
+   */
+  public IntermediateResultsBlock(@Nonnull AggregationFunctionContext[] aggregationFunctionContexts,
+      @Nullable AggregationGroupByResult aggregationGroupByResults, DataSchema dataSchema) {
+    _aggregationFunctionContexts = aggregationFunctionContexts;
+    _aggregationGroupByResult = aggregationGroupByResults;
+    _dataSchema = dataSchema;
+  }
+
+
+  public IntermediateResultsBlock(@Nonnull Table table) {
+    _table = table;
+    _dataSchema = table.getDataSchema();
+  }
+
+  /**
    * Constructor for exception block.
    */
   public IntermediateResultsBlock(@Nonnull ProcessingException processingException, @Nonnull Exception e) {
@@ -110,12 +133,12 @@ public class IntermediateResultsBlock implements Block {
   }
 
   @Nullable
-  public DataSchema getSelectionDataSchema() {
-    return _selectionDataSchema;
+  public DataSchema getDataSchema() {
+    return _dataSchema;
   }
 
-  public void setSelectionDataSchema(@Nullable DataSchema dataSchema) {
-    _selectionDataSchema = dataSchema;
+  public void setDataSchema(@Nullable DataSchema dataSchema) {
+    _dataSchema = dataSchema;
   }
 
   @Nullable
@@ -205,6 +228,12 @@ public class IntermediateResultsBlock implements Block {
   @Nonnull
   public DataTable getDataTable()
       throws Exception {
+
+    if (_table != null) {
+      return getResultDataTable();
+    }
+
+    // TODO: remove all these ifs once every operator starts using {@link Table}
     if (_selectionResult != null) {
       return getSelectionResultDataTable();
     }
@@ -225,10 +254,61 @@ public class IntermediateResultsBlock implements Block {
   }
 
   @Nonnull
+  private DataTable getResultDataTable() throws IOException {
+
+    DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
+
+    Iterator<Record> iterator = _table.iterator();
+    while (iterator.hasNext()) {
+      Record record = iterator.next();
+      dataTableBuilder.startRow();
+      int columnIndex = 0;
+      for (Object keyColumn : record.getKey().getColumns()) {
+        ColumnDataType columnDataType = _dataSchema.getColumnDataType(columnIndex);
+        setDataTableColumn(columnDataType, dataTableBuilder, columnIndex, keyColumn);
+        columnIndex++;
+      }
+      for (Object valueColumn : record.getValues()) {
+        ColumnDataType columnDataType = _dataSchema.getColumnDataType(columnIndex);
+        setDataTableColumn(columnDataType, dataTableBuilder, columnIndex, valueColumn);
+        columnIndex++;
+      }
+      dataTableBuilder.finishRow();
+    }
+    DataTable dataTable = dataTableBuilder.build();
+    return attachMetadataToDataTable(dataTable);
+  }
+
+  private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder dataTableBuilder, int columnIndex, Object value)
+      throws IOException {
+    switch (columnDataType) {
+
+      case INT:
+        dataTableBuilder.setColumn(columnIndex, ((Number) value).intValue());
+        break;
+      case LONG:
+        dataTableBuilder.setColumn(columnIndex, ((Number) value).longValue());
+        break;
+      case FLOAT:
+        dataTableBuilder.setColumn(columnIndex, ((Number) value).floatValue());
+        break;
+      case DOUBLE:
+        dataTableBuilder.setColumn(columnIndex, ((Number) value).doubleValue());
+        break;
+      case STRING:
+        dataTableBuilder.setColumn(columnIndex, (String) value);
+        break;
+      default:
+        dataTableBuilder.setColumn(columnIndex, value);
+        break;
+    }
+  }
+
+  @Nonnull
   private DataTable getSelectionResultDataTable()
       throws Exception {
     return attachMetadataToDataTable(
-        SelectionOperatorUtils.getDataTableFromRows(_selectionResult, _selectionDataSchema));
+        SelectionOperatorUtils.getDataTableFromRows(_selectionResult, _dataSchema));
   }
 
   @Nonnull
@@ -237,7 +317,7 @@ public class IntermediateResultsBlock implements Block {
     // Extract each aggregation column name and type from aggregation function context.
     int numAggregationFunctions = _aggregationFunctionContexts.length;
     String[] columnNames = new String[numAggregationFunctions];
-    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numAggregationFunctions];
+    ColumnDataType[] columnDataTypes = new ColumnDataType[numAggregationFunctions];
     for (int i = 0; i < numAggregationFunctions; i++) {
       AggregationFunctionContext aggregationFunctionContext = _aggregationFunctionContexts[i];
       columnNames[i] = aggregationFunctionContext.getAggregationColumnName();
@@ -273,8 +353,8 @@ public class IntermediateResultsBlock implements Block {
   private DataTable getAggregationGroupByResultDataTable()
       throws Exception {
     String[] columnNames = new String[]{"functionName", "GroupByResultMap"};
-    DataSchema.ColumnDataType[] columnDataTypes =
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.OBJECT};
+    ColumnDataType[] columnDataTypes =
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.OBJECT};
 
     // Build the data table.
     DataTableBuilder dataTableBuilder = new DataTableBuilder(new DataSchema(columnNames, columnDataTypes));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
new file mode 100644
index 0000000..6296318
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -0,0 +1,133 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.request.GroupBy;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
+import org.apache.pinot.core.query.aggregation.groupby.DefaultGroupByExecutor;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByExecutor;
+import org.apache.pinot.core.startree.executor.StarTreeGroupByExecutor;
+
+
+/**
+ * The <code>AggregationOperator</code> class provides the operator for aggregation group-by query on a single segment.
+ */
+public class AggregationGroupByOrderByOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "AggregationGroupByOrderByOperator";
+
+  private final DataSchema _dataSchema;
+
+  private final AggregationFunctionContext[] _functionContexts;
+  private final GroupBy _groupBy;
+  private final int _maxInitialResultHolderCapacity;
+  private final int _numGroupsLimit;
+  private final TransformOperator _transformOperator;
+  private final long _numTotalRawDocs;
+  private final boolean _useStarTree;
+
+  private ExecutionStatistics _executionStatistics;
+
+  public AggregationGroupByOrderByOperator(@Nonnull AggregationFunctionContext[] functionContexts, @Nonnull GroupBy groupBy,
+      int maxInitialResultHolderCapacity, int numGroupsLimit, @Nonnull TransformOperator transformOperator,
+      long numTotalRawDocs, boolean useStarTree) {
+    _functionContexts = functionContexts;
+    _groupBy = groupBy;
+    _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
+    _numGroupsLimit = numGroupsLimit;
+    _transformOperator = transformOperator;
+    _numTotalRawDocs = numTotalRawDocs;
+    _useStarTree = useStarTree;
+
+    int numColumns = groupBy.getExpressionsSize() + _functionContexts.length;
+    String[] columnNames = new String[numColumns];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numColumns];
+
+    // extract column names and data types for group by keys
+    int index = 0;
+    for (String groupByColumn : groupBy.getExpressions()) {
+      columnNames[index] = groupByColumn;
+      TransformExpressionTree expression = TransformExpressionTree.compileToExpressionTree(groupByColumn);
+      columnDataTypes[index] =
+          DataSchema.ColumnDataType.fromDataType(_transformOperator.getResultMetadata(expression).getDataType(), true);
+      index++;
+    }
+
+    // extract column names and data types for aggregations
+    for (AggregationFunctionContext functionContext : functionContexts) {
+      columnNames[index] = functionContext.getAggregationFunction().getType().toString().toLowerCase() + "("
+          + functionContext.getColumn() + ")";
+      columnDataTypes[index] = functionContext.getAggregationFunction().getIntermediateResultColumnType();
+      index++;
+    }
+
+    _dataSchema = new DataSchema(columnNames, columnDataTypes);
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    int numDocsScanned = 0;
+
+    // Perform aggregation group-by on all the blocks
+    GroupByExecutor groupByExecutor;
+    if (_useStarTree) {
+      groupByExecutor =
+          new StarTreeGroupByExecutor(_functionContexts, _groupBy, _maxInitialResultHolderCapacity, _numGroupsLimit,
+              _transformOperator);
+    } else {
+      groupByExecutor =
+          new DefaultGroupByExecutor(_functionContexts, _groupBy, _maxInitialResultHolderCapacity, _numGroupsLimit,
+              _transformOperator);
+    }
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      numDocsScanned += transformBlock.getNumDocs();
+      groupByExecutor.process(transformBlock);
+    }
+    AggregationGroupByResult groupByResult = groupByExecutor.getResult();
+
+    // Gather execution statistics
+    long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = numDocsScanned * _transformOperator.getNumColumnsProjected();
+    _executionStatistics =
+        new ExecutionStatistics(numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+            _numTotalRawDocs);
+
+    // Build intermediate result block based on aggregation group-by result from the executor
+    return new IntermediateResultsBlock(_functionContexts, groupByResult, _dataSchema);
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    return _executionStatistics;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
new file mode 100644
index 0000000..b8e534f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/AggregationGroupByOrderByPlanNode.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import javax.annotation.Nonnull;
+import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.GroupBy;
+import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
+import org.apache.pinot.core.operator.query.AggregationGroupByOrderByOperator;
+import org.apache.pinot.core.query.aggregation.AggregationFunctionContext;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.startree.StarTreeUtils;
+import org.apache.pinot.core.startree.plan.StarTreeTransformPlanNode;
+import org.apache.pinot.core.startree.v2.AggregationFunctionColumnPair;
+import org.apache.pinot.core.startree.v2.StarTreeV2;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The <code>AggregationGroupByOrderByPlanNode</code> class provides the execution plan for aggregation group-by order-by query on a
+ * single segment.
+ */
+public class AggregationGroupByOrderByPlanNode implements PlanNode {
+  private static final Logger LOGGER = LoggerFactory.getLogger(AggregationGroupByOrderByPlanNode.class);
+
+  private final IndexSegment _indexSegment;
+  private final int _maxInitialResultHolderCapacity;
+  private final int _numGroupsLimit;
+  private final List<AggregationInfo> _aggregationInfos;
+  private final AggregationFunctionContext[] _functionContexts;
+  private final GroupBy _groupBy;
+  private final TransformPlanNode _transformPlanNode;
+  private final StarTreeTransformPlanNode _starTreeTransformPlanNode;
+
+  public AggregationGroupByOrderByPlanNode(@Nonnull IndexSegment indexSegment, @Nonnull BrokerRequest brokerRequest,
+      int maxInitialResultHolderCapacity, int numGroupsLimit) {
+    _indexSegment = indexSegment;
+    _maxInitialResultHolderCapacity = maxInitialResultHolderCapacity;
+    _numGroupsLimit = numGroupsLimit;
+    _aggregationInfos = brokerRequest.getAggregationsInfo();
+    _functionContexts =
+        AggregationFunctionUtils.getAggregationFunctionContexts(brokerRequest, indexSegment.getSegmentMetadata());
+    _groupBy = brokerRequest.getGroupBy();
+
+    List<StarTreeV2> starTrees = indexSegment.getStarTrees();
+    if (starTrees != null) {
+      if (!StarTreeUtils.isStarTreeDisabled(brokerRequest)) {
+        Set<AggregationFunctionColumnPair> aggregationFunctionColumnPairs = new HashSet<>();
+        for (AggregationInfo aggregationInfo : _aggregationInfos) {
+          aggregationFunctionColumnPairs.add(AggregationFunctionUtils.getFunctionColumnPair(aggregationInfo));
+        }
+        Set<TransformExpressionTree> groupByExpressions = new HashSet<>();
+        for (String expression : _groupBy.getExpressions()) {
+          groupByExpressions.add(TransformExpressionTree.compileToExpressionTree(expression));
+        }
+        FilterQueryTree rootFilterNode = RequestUtils.generateFilterQueryTree(brokerRequest);
+        for (StarTreeV2 starTreeV2 : starTrees) {
+          if (StarTreeUtils
+              .isFitForStarTree(starTreeV2.getMetadata(), aggregationFunctionColumnPairs, groupByExpressions,
+                  rootFilterNode)) {
+            _transformPlanNode = null;
+            _starTreeTransformPlanNode =
+                new StarTreeTransformPlanNode(starTreeV2, aggregationFunctionColumnPairs, groupByExpressions,
+                    rootFilterNode, brokerRequest.getDebugOptions());
+            return;
+          }
+        }
+      }
+    }
+
+    _transformPlanNode = new TransformPlanNode(_indexSegment, brokerRequest);
+    _starTreeTransformPlanNode = null;
+  }
+
+  @Override
+  public AggregationGroupByOrderByOperator run() {
+    int numTotalRawDocs = _indexSegment.getSegmentMetadata().getTotalRawDocs();
+    if (_transformPlanNode != null) {
+      // Do not use star-tree
+      return new AggregationGroupByOrderByOperator(_functionContexts, _groupBy, _maxInitialResultHolderCapacity,
+          _numGroupsLimit, _transformPlanNode.run(), numTotalRawDocs, false);
+    } else {
+      // Use star-tree
+      return new AggregationGroupByOrderByOperator(_functionContexts, _groupBy, _maxInitialResultHolderCapacity,
+          _numGroupsLimit, _starTreeTransformPlanNode.run(), numTotalRawDocs, true);
+    }
+  }
+
+  @Override
+  public void showTree(String prefix) {
+    LOGGER.debug(prefix + "Aggregation Group-by Plan Node:");
+    LOGGER.debug(prefix + "Operator: AggregationGroupByOperator");
+    LOGGER.debug(prefix + "Argument 0: IndexSegment - " + _indexSegment.getSegmentName());
+    LOGGER.debug(prefix + "Argument 1: Aggregations - " + _aggregationInfos);
+    LOGGER.debug(prefix + "Argument 2: GroupBy - " + _groupBy);
+    if (_transformPlanNode != null) {
+      LOGGER.debug(prefix + "Argument 3: TransformPlanNode -");
+      _transformPlanNode.showTree(prefix + "    ");
+    } else {
+      LOGGER.debug(prefix + "Argument 3: StarTreeTransformPlanNode -");
+      _starTreeTransformPlanNode.showTree(prefix + "    ");
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index b73d2da..3bdffc1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -20,14 +20,18 @@ package org.apache.pinot.core.plan;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.CombineGroupByOperator;
+import org.apache.pinot.core.operator.CombineGroupByOrderByOperator;
 import org.apache.pinot.core.operator.CombineOperator;
 import org.apache.pinot.core.query.exception.BadQueryRequestException;
+import org.apache.pinot.core.util.GroupByUtils;
 import org.apache.pinot.core.util.trace.TraceCallable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -141,6 +145,11 @@ public class CombinePlanNode implements PlanNode {
     // TODO: use the same combine operator for both aggregation and selection query.
     if (_brokerRequest.isSetAggregationsInfo() && _brokerRequest.getGroupBy() != null) {
       // Aggregation group-by query
+      Map<String, String> queryOptions = _brokerRequest.getQueryOptions();
+      // new Combine operator only when GROUP_BY_MODE explicitly set to SQL
+      if (GroupByUtils.isGroupByMode(Request.SQL, queryOptions)) {
+        return new CombineGroupByOrderByOperator(operators, _brokerRequest, _executorService, _timeOutMs);
+      }
       return new CombineGroupByOperator(operators, _brokerRequest, _executorService, _timeOutMs, _numGroupsLimit);
     } else {
       // Selection or aggregation only query
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 9efc99f..15800d5 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -22,13 +22,16 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.core.data.manager.SegmentDataManager;
 import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.plan.AggregationGroupByOrderByPlanNode;
 import org.apache.pinot.core.plan.AggregationGroupByPlanNode;
 import org.apache.pinot.core.plan.AggregationPlanNode;
 import org.apache.pinot.core.plan.CombinePlanNode;
@@ -42,10 +45,10 @@ import org.apache.pinot.core.plan.SelectionPlanNode;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.config.QueryExecutorConfig;
 import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.core.util.GroupByUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * The <code>InstancePlanMakerImplV2</code> class is the default implementation of {@link PlanMaker}.
  */
@@ -97,6 +100,12 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   public PlanNode makeInnerSegmentPlan(IndexSegment indexSegment, BrokerRequest brokerRequest) {
     if (brokerRequest.isSetAggregationsInfo()) {
       if (brokerRequest.isSetGroupBy()) {
+        Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+        // new Combine operator only when GROUP_BY_MODE explicitly set to SQL
+        if (GroupByUtils.isGroupByMode(Request.SQL, queryOptions)) {
+          return new AggregationGroupByOrderByPlanNode(indexSegment, brokerRequest, _maxInitialResultHolderCapacity,
+              _numGroupsLimit);
+        }
         return new AggregationGroupByPlanNode(indexSegment, brokerRequest, _maxInitialResultHolderCapacity,
             _numGroupsLimit);
       } else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 3b44b99..1e70180 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -49,6 +49,11 @@ public class AggregationFunctionUtils {
     return aggregationInfo.getAggregationParams().get(COLUMN_KEY);
   }
 
+  public static String getAggregationColumnName(AggregationInfo aggregationInfo) {
+    return aggregationInfo.getAggregationType().toLowerCase() + "(" + AggregationFunctionUtils.getColumn(aggregationInfo)
+        + ")";
+  }
+
   /**
    * Creates an {@link AggregationFunctionColumnPair} from the {@link AggregationInfo}.
    */
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
index 9d91c58..746df74 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
@@ -42,7 +42,6 @@ import org.apache.pinot.core.query.aggregation.function.MinAggregationFunction;
  * The <code>AggregationGroupByTrimmingService</code> class provides trimming service for aggregation group-by queries.
  */
 public class AggregationGroupByTrimmingService {
-  public static final String GROUP_KEY_DELIMITER = "\t";
 
   private final AggregationFunction[] _aggregationFunctions;
   private final int _groupByTopN;
@@ -229,7 +228,7 @@ public class AggregationGroupByTrimmingService {
       GroupKeyResultPair groupKeyResultPair;
       while ((groupKeyResultPair = _heap.poll()) != null) {
         // Set limit to -1 to prevent removing trailing empty strings
-        String[] groupKeys = groupKeyResultPair._groupKey.split(GROUP_KEY_DELIMITER, -1);
+        String[] groupKeys = groupKeyResultPair._groupKey.split(GroupKeyGenerator.DELIMITER, -1);
 
         GroupByResult groupByResult = new GroupByResult();
         groupByResult.setGroup(Arrays.asList(groupKeys));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
index 78bcf08..fc0e55b 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/DictionaryBasedGroupKeyGenerator.java
@@ -427,7 +427,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
       StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].getStringValue(rawKey % cardinality));
       rawKey /= cardinality;
       for (int i = 1; i < _numGroupByExpressions; i++) {
-        groupKeyBuilder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
+        groupKeyBuilder.append(GroupKeyGenerator.DELIMITER);
         cardinality = _cardinalities[i];
         groupKeyBuilder.append(_dictionaries[i].getStringValue(rawKey % cardinality));
         rawKey /= cardinality;
@@ -598,7 +598,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
     StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].get((int) (rawKey % cardinality)).toString());
     rawKey /= cardinality;
     for (int i = 1; i < _numGroupByExpressions; i++) {
-      groupKeyBuilder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
+      groupKeyBuilder.append(GroupKeyGenerator.DELIMITER);
       cardinality = _cardinalities[i];
       groupKeyBuilder.append(_dictionaries[i].get((int) (rawKey % cardinality)));
       rawKey /= cardinality;
@@ -770,7 +770,7 @@ public class DictionaryBasedGroupKeyGenerator implements GroupKeyGenerator {
   private String getGroupKey(IntArray rawKey) {
     StringBuilder groupKeyBuilder = new StringBuilder(_dictionaries[0].get(rawKey._elements[0]).toString());
     for (int i = 1; i < _numGroupByExpressions; i++) {
-      groupKeyBuilder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
+      groupKeyBuilder.append(GroupKeyGenerator.DELIMITER);
       groupKeyBuilder.append(_dictionaries[i].get(rawKey._elements[i]));
     }
     return groupKeyBuilder.toString();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
index a9e81b7..a2a1524 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/GroupKeyGenerator.java
@@ -27,6 +27,7 @@ import org.apache.pinot.core.operator.blocks.TransformBlock;
  * Interface for generating group keys.
  */
 public interface GroupKeyGenerator {
+  String DELIMITER = "\t";
   int INVALID_ID = -1;
 
   /**
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
index 3eb34b1..58e48a3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/NoDictionaryMultiColumnGroupKeyGenerator.java
@@ -212,7 +212,7 @@ public class NoDictionaryMultiColumnGroupKeyGenerator implements GroupKeyGenerat
       }
 
       if (i > 0) {
-        builder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
+        builder.append(GroupKeyGenerator.DELIMITER);
       }
       builder.append(key);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index d442c04..4bd5f14 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -23,7 +23,6 @@ import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -32,6 +31,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.config.TableNameBuilder;
@@ -41,6 +41,7 @@ import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.apache.pinot.common.query.ReduceService;
+import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.request.GroupBy;
 import org.apache.pinot.common.request.HavingFilterQuery;
@@ -54,21 +55,27 @@ import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.aggregation.DistinctTable;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
 import org.apache.pinot.core.query.selection.SelectionOperatorService;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.GroupByUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
 /**
  * The <code>BrokerReduceService</code> class provides service to reduce data tables gathered from multiple servers
  * to {@link BrokerResponseNative}.
@@ -214,17 +221,25 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
     }
 
     // Parse the option from request whether to preserve the type
-    String preserveTypeString = (brokerRequest.getQueryOptions() == null) ? "false" : brokerRequest.getQueryOptions()
-        .getOrDefault(CommonConstants.Broker.Request.QueryOptionKey.PRESERVE_TYPE, "false");
+    Map<String, String> queryOptions = brokerRequest.getQueryOptions();
+    String preserveTypeString =
+        (queryOptions == null) ? "false" : queryOptions.getOrDefault(QueryOptionKey.PRESERVE_TYPE, "false");
     boolean preserveType = Boolean.valueOf(preserveTypeString);
 
     Selection selection = brokerRequest.getSelections();
     if (dataTableMap.isEmpty()) {
       // For empty data table map, construct empty result using the cached data schema for selection query if exists
       if (cachedDataSchema != null) {
-        List<String> selectionColumns =
-            SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), cachedDataSchema);
-        brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, Collections.emptyList()));
+        if (brokerRequest.isSetSelections()) {
+          List<String> selectionColumns =
+              SelectionOperatorUtils.getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(),
+                  cachedDataSchema);
+          brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, new ArrayList<>(0)));
+        } else if (brokerRequest.isSetGroupBy() && GroupByUtils.isGroupByMode(Request.SQL, queryOptions)
+            && GroupByUtils.isResponseFormat(Request.SQL, queryOptions)) {
+          setSQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
+              brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap, preserveType);
+        }
       }
     } else {
       // Reduce server responses data and set query results into the broker response
@@ -252,8 +267,8 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
             if (brokerMetrics != null) {
               brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
             }
-            brokerResponseNative
-                .addToExceptions(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
+            brokerResponseNative.addToExceptions(
+                new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
           }
         }
 
@@ -268,16 +283,41 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
               preserveType);
         } else {
           // Aggregation group-by query.
-          boolean[] aggregationFunctionSelectStatus =
-              AggregationFunctionUtils.getAggregationFunctionsSelectStatus(brokerRequest.getAggregationsInfo());
-          setGroupByHavingResults(brokerResponseNative, aggregationFunctions, aggregationFunctionSelectStatus,
-              brokerRequest.getGroupBy(), dataTableMap, brokerRequest.getHavingFilterQuery(),
-              brokerRequest.getHavingFilterSubQueryMap(), preserveType);
-          if (brokerMetrics != null && (!brokerResponseNative.getAggregationResults().isEmpty())) {
-            // We emit the group by size when the result isn't empty. All the sizes among group-by results should be the same.
-            // Thus, we can just emit the one from the 1st result.
-            brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.GROUP_BY_SIZE,
-                brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size());
+          // read results as records if  GROUP_BY_MODE is explicitly set to SQL
+
+          if (GroupByUtils.isGroupByMode(Request.SQL, queryOptions)) {
+            // sql + order by
+
+            int resultSize = 0;
+
+            // if RESPONSE_FORMAT is SQL, return results in {@link ResultTable}
+            if (GroupByUtils.isResponseFormat(Request.SQL, queryOptions)) {
+              setSQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
+                  brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap, preserveType);
+              resultSize = brokerResponseNative.getResultTable().getRows().size();
+            } else {
+              setPQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
+                  brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap, preserveType);
+              if (!brokerResponseNative.getAggregationResults().isEmpty()) {
+                resultSize = brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size();
+              }
+            }
+            if (brokerMetrics != null && resultSize > 0) {
+              brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.GROUP_BY_SIZE, resultSize);
+            }
+          } else {
+
+            boolean[] aggregationFunctionSelectStatus =
+                AggregationFunctionUtils.getAggregationFunctionsSelectStatus(brokerRequest.getAggregationsInfo());
+            setGroupByHavingResults(brokerResponseNative, aggregationFunctions, aggregationFunctionSelectStatus,
+                brokerRequest.getGroupBy(), dataTableMap, brokerRequest.getHavingFilterQuery(),
+                brokerRequest.getHavingFilterSubQueryMap(), preserveType);
+            if (brokerMetrics != null && (!brokerResponseNative.getAggregationResults().isEmpty())) {
+              // We emit the group by size when the result isn't empty. All the sizes among group-by results should be the same.
+              // Thus, we can just emit the one from the 1st result.
+              brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.GROUP_BY_SIZE,
+                  brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size());
+            }
           }
         }
       }
@@ -359,7 +399,7 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
     for (DataTable dataTable : dataTableMap.values()) {
       for (int i = 0; i < numAggregationFunctions; i++) {
         Object intermediateResultToMerge;
-        DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+        ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
         switch (columnDataType) {
           case LONG:
             intermediateResultToMerge = dataTable.getLong(0, i);
@@ -429,6 +469,208 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
   }
 
   /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTableMap map from server to data table
+   */
+  private void setSQLGroupByOrderByResults(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+      List<AggregationInfo> aggregationInfos, GroupBy groupBy, List<SelectionSort> orderBy,
+      Map<ServerInstance, DataTable> dataTableMap, boolean preserveType) {
+
+    List<String> columns = new ArrayList<>(dataSchema.size());
+    for (int i = 0; i < dataSchema.size(); i++) {
+      columns.add(dataSchema.getColumnName(i));
+    }
+
+    int numGroupBy = groupBy.getExpressionsSize();
+    int numAggregations = aggregationInfos.size();
+
+    IndexedTable indexedTable =
+        getIndexedTable(numGroupBy, numAggregations, groupBy, aggregationInfos, orderBy, dataSchema, dataTableMap);
+
+    List<AggregationFunction> aggregationFunctions = new ArrayList<>(aggregationInfos.size());
+    for (AggregationInfo aggregationInfo : aggregationInfos) {
+      aggregationFunctions
+          .add(AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
+    }
+
+    List<Serializable[]> rows = new ArrayList<>();
+    int numColumns = columns.size();
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    int numRows = 0;
+    while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
+
+      Record nextRecord = sortedIterator.next();
+      Serializable[] row = new Serializable[numColumns];
+      int index = 0;
+      for (Object keyColumn : nextRecord.getKey().getColumns()) {
+        row[index++] = getSerializableValue(keyColumn);
+      }
+      int aggNum = 0;
+      for (Object valueColumn : nextRecord.getValues()) {
+        row[index] = getSerializableValue(aggregationFunctions.get(aggNum).extractFinalResult(valueColumn));
+        if (preserveType) {
+          row[index] = AggregationFunctionUtils.formatValue(row[index]);
+        }
+        index++;
+      }
+      rows.add(row);
+      numRows++;
+    }
+
+    brokerResponseNative.setResultTable(new ResultTable(columns, rows));
+  }
+
+  private IndexedTable getIndexedTable(int numGroupBy, int numAggregations, GroupBy groupBy,
+      List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, DataSchema dataSchema,
+      Map<ServerInstance, DataTable> dataTableMap) {
+
+    IndexedTable indexedTable = new ConcurrentIndexedTable();
+    int indexedTableCapacity = 1_000_000;
+    // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this value to a higher number until we can tune the resize
+    // int capacity = GroupByUtils.getTableCapacity((int) groupBy.getTopN());
+    indexedTable.init(dataSchema, aggregationInfos, orderBy, indexedTableCapacity);
+
+    for (DataTable dataTable : dataTableMap.values()) {
+      BiFunction[] functions = new BiFunction[dataSchema.size()];
+      for (int i = 0; i < dataSchema.size(); i++) {
+        ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+        BiFunction<Integer, Integer, Object> function;
+        switch (columnDataType) {
+
+          case INT:
+            function = dataTable::getInt;
+            break;
+          case LONG:
+            function = dataTable::getLong;
+            break;
+          case FLOAT:
+            function = dataTable::getFloat;
+            break;
+          case DOUBLE:
+            function = dataTable::getDouble;
+            break;
+          case STRING:
+            function = dataTable::getString;
+            break;
+          default:
+            function = dataTable::getObject;
+        }
+        functions[i] = function;
+      }
+
+      for (int row = 0; row < dataTable.getNumberOfRows(); row++) {
+        Object[] key = new Object[numGroupBy];
+        int col = 0;
+        for (int j = 0; j < numGroupBy; j++) {
+          key[j] = functions[col].apply(row, col);
+          col++;
+        }
+        Object[] value = new Object[numAggregations];
+        for (int j = 0; j < numAggregations; j++) {
+          value[j] = functions[col].apply(row, col);
+          col++;
+        }
+        Record record = new Record(new Key(key), value);
+        indexedTable.upsert(record);
+      }
+    }
+    indexedTable.finish(true);
+    return indexedTable;
+  }
+
+  /**
+   * Extract the results of group by order by into a List of {@link AggregationResult}
+   * There will be 1 aggregation result per aggregation. The group by keys will be the same across all aggregations
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTableMap map from server to data table
+   */
+  private void setPQLGroupByOrderByResults(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+      List<AggregationInfo> aggregationInfos, GroupBy groupBy, List<SelectionSort> orderBy,
+      Map<ServerInstance, DataTable> dataTableMap, boolean preserveType) {
+
+    int numGroupBy = groupBy.getExpressionsSize();
+    int numAggregations = aggregationInfos.size();
+
+    List<String> groupByColumns = new ArrayList<>(numGroupBy);
+    for (int i = 0; i < numGroupBy; i++) {
+      groupByColumns.add(dataSchema.getColumnName(i));
+    }
+
+    List<String> aggregationColumns = new ArrayList<>(numAggregations);
+    for (int i = numGroupBy; i < dataSchema.size(); i++) {
+      aggregationColumns.add(dataSchema.getColumnName(i));
+    }
+
+    List<AggregationFunction> aggregationFunctions = new ArrayList<>(aggregationInfos.size());
+    for (AggregationInfo aggregationInfo : aggregationInfos) {
+      aggregationFunctions
+          .add(AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
+    }
+
+    List<List<GroupByResult>> groupByResults = new ArrayList<>(numAggregations);
+    for (int i = 0; i < numAggregations; i++) {
+      groupByResults.add(new ArrayList<>());
+    }
+
+    if (!dataTableMap.isEmpty()) {
+      IndexedTable indexedTable =
+          getIndexedTable(numGroupBy, numAggregations, groupBy, aggregationInfos, orderBy, dataSchema, dataTableMap);
+
+      Iterator<Record> sortedIterator = indexedTable.iterator();
+      int numRows = 0;
+      while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
+
+        Record nextRecord = sortedIterator.next();
+
+        List<String> group = new ArrayList<>(numGroupBy);
+        for (Object keyColumn : nextRecord.getKey().getColumns()) {
+          group.add(keyColumn.toString());
+        }
+
+        Object[] values = nextRecord.getValues();
+        for (int i = 0; i < numAggregations; i++) {
+          Serializable serializableValue =
+              getSerializableValue(aggregationFunctions.get(i).extractFinalResult(values[i]));
+          if (preserveType) {
+            serializableValue = AggregationFunctionUtils.formatValue(serializableValue);
+          }
+          GroupByResult groupByResult = new GroupByResult();
+          groupByResult.setGroup(group);
+          groupByResult.setValue(serializableValue);
+
+          groupByResults.get(i).add(groupByResult);
+        }
+        numRows++;
+      }
+    }
+
+    List<AggregationResult> aggregationResults = new ArrayList<>(numAggregations);
+    for (int i = 0; i < numAggregations; i++) {
+      AggregationResult aggregationResult =
+          new AggregationResult(groupByResults.get(i), groupByColumns, aggregationColumns.get(i));
+      aggregationResults.add(aggregationResult);
+    }
+    brokerResponseNative.setAggregationResults(aggregationResults);
+  }
+
+  private Serializable getSerializableValue(Object value) {
+    if (value instanceof Number) {
+      return (Number) value;
+    } else {
+      return value.toString();
+    }
+  }
+
+  /**
    * Reduce group-by results from multiple servers and set them into BrokerResponseNative passed in.
    *
    * @param brokerResponseNative broker response.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java
index 352f9b4..9caa3f4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/CombineService.java
@@ -93,8 +93,8 @@ public class CombineService {
 
       // Data schema will be null if exceptions caught during query processing.
       // Result set size will be zero if no row matches the predicate.
-      DataSchema mergedBlockSchema = mergedBlock.getSelectionDataSchema();
-      DataSchema blockToMergeSchema = blockToMerge.getSelectionDataSchema();
+      DataSchema mergedBlockSchema = mergedBlock.getDataSchema();
+      DataSchema blockToMergeSchema = blockToMerge.getDataSchema();
       Collection<Serializable[]> mergedBlockResultSet = mergedBlock.getSelectionResult();
       Collection<Serializable[]> blockToMergeResultSet = blockToMerge.getSelectionResult();
 
@@ -103,7 +103,7 @@ public class CombineService {
 
         // If block to merge schema is not null, set its data schema and result to the merged block.
         if (blockToMergeSchema != null) {
-          mergedBlock.setSelectionDataSchema(blockToMergeSchema);
+          mergedBlock.setDataSchema(blockToMergeSchema);
           mergedBlock.setSelectionResult(blockToMergeResultSet);
         }
       } else {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
new file mode 100644
index 0000000..6fbb03a
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.util.Map;
+
+import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.*;
+
+
+public final class GroupByUtils {
+
+  public static final int NUM_RESULTS_LOWER_LIMIT = 5000;
+
+  private GroupByUtils() {
+
+  }
+
+  /**
+   * Returns the higher of topN * 5 or 5k. This is to ensure we better precision in results
+   */
+  public static int getTableCapacity(int topN) {
+    return Math.max(topN * 5, NUM_RESULTS_LOWER_LIMIT);
+  }
+
+  public static boolean isGroupByMode(String groupByMode, Map<String, String> queryOptions) {
+    if (queryOptions != null) {
+      String groupByModeValue = queryOptions.get(QueryOptionKey.GROUP_BY_MODE);
+      return groupByModeValue != null && groupByModeValue.equalsIgnoreCase(groupByMode);
+    }
+    return false;
+  }
+
+  public static boolean isResponseFormat(String responseFormat, Map<String, String> queryOptions) {
+    if (queryOptions != null) {
+      String responseFormatValue = queryOptions.get(QueryOptionKey.RESPONSE_FORMAT);
+      return responseFormatValue != null && responseFormatValue.equalsIgnoreCase(responseFormat);
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java
index a648908..3fd774a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java
@@ -236,7 +236,7 @@ public class OrderByUtilsTest {
     s4.setColumn("sum(metric0)");
     orderBy = Lists.newArrayList(s0, s4);
     Comparator<Record> keysAndValuesComparator =
-        OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos);
+        OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
     records.sort(keysAndValuesComparator);
     expected0 = Lists.newArrayList("abc", "abc", "abc", "bcd", "ghi", "mno", "mno");
     actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index 307b3a3..5a81445 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -71,12 +71,11 @@ public class IndexedTableTest {
     sel.setIsAsc(true);
     List<SelectionSort> orderBy = Lists.newArrayList(sel);
 
-    // max capacity 10, evict at 12, evict until 11
-    indexedTable.init(dataSchema, aggregationInfos, orderBy, 10);
+    // max capacity 5, buffered capacity at 10
+    indexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
 
     // 3 threads upsert together
     // a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times (20)
-    // buffered capacity 12.
     // inserting 14 unique records
     // c (10000) and f (20000) should be trimmed out no matter what
     // a (60) and i (500) trimmed out after size()
@@ -124,9 +123,9 @@ public class IndexedTableTest {
         future.get(10, TimeUnit.SECONDS);
       }
 
-      indexedTable.finish();
-      Assert.assertEquals(indexedTable.size(), 10);
-      checkSurvivors(indexedTable, "c", "f");
+      indexedTable.finish(false);
+      Assert.assertEquals(indexedTable.size(), 5);
+      checkEvicted(indexedTable, "c", "f");
 
     } finally {
       executorService.shutdown();
@@ -159,16 +158,16 @@ public class IndexedTableTest {
     List<SelectionSort> orderBy = Lists.newArrayList(sel);
 
     IndexedTable simpleIndexedTable = new SimpleIndexedTable();
-    // max capacity 10, evict at 12, evict until 11
-    simpleIndexedTable.init(dataSchema, aggregationInfos, orderBy, 10);
+    // max capacity 5, buffered capacity 10
+    simpleIndexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
     // merge table
     IndexedTable mergeTable = new SimpleIndexedTable();
     mergeTable.init(dataSchema, aggregationInfos, orderBy, 10);
     testNonConcurrent(simpleIndexedTable, mergeTable);
 
     IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
-    // max capacity 10, evict at 12, evict until 11
-    concurrentIndexedTable.init(dataSchema, aggregationInfos, orderBy, 10);
+    // max capacity 5, buffered capacity 10
+    concurrentIndexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
     mergeTable = new SimpleIndexedTable();
     mergeTable.init(dataSchema, aggregationInfos, orderBy, 10);
     testNonConcurrent(concurrentIndexedTable, mergeTable);
@@ -189,76 +188,67 @@ public class IndexedTableTest {
     Assert.assertEquals(indexedTable.size(), 2);
 
     indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
+    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
+    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
     indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
     indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
+    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
     indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
     indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
     indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{10d, 800d}));
     indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{10d, 900d}));
-    indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
 
     // reached max capacity
-    Assert.assertEquals(indexedTable.size(), 10);
+    Assert.assertEquals(indexedTable.size(), 9);
 
     // repeat row b
     indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
-    Assert.assertEquals(indexedTable.size(), 10);
+    Assert.assertEquals(indexedTable.size(), 9);
+
+    // insert 1 more rows to reach buffer limit
+    indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
+
+    // resized to 5
+    Assert.assertEquals(indexedTable.size(), 5);
 
-    // insert 2 more rows to reach buffer limit
+    // filling up again
     indexedTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
     indexedTable.upsert(getRecord(new Object[]{"l", 12, 120d}, new Object[]{10d, 1200d}));
-    // resized to evict capacity (evict a)
-    Assert.assertEquals(indexedTable.size(), 11);
-    checkSurvivors(indexedTable, "a");
-    checkAggregations(indexedTable, 30d);
-
-    // repeat row b
+    indexedTable.upsert(getRecord(new Object[]{"m", 13, 130d}, new Object[]{10d, 1300d}));
     indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
-    Assert.assertEquals(indexedTable.size(), 11);
+    // repeat f
+    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
+    // repeat g
+    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
+    Assert.assertEquals(indexedTable.size(), 9);
 
-    // new row, reorder and evict lowest 1 record (b)
-    indexedTable.upsert(getRecord(new Object[]{"m", 13, 130d}, new Object[]{10d, 1300d}));
-    Assert.assertEquals(indexedTable.size(), 11);
-    checkSurvivors(indexedTable, "b");
-    checkAggregations(indexedTable, 30d);
 
     // repeat record j
     mergeTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
-    // repeat record c
-    mergeTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
-    // insert evicted record b
+    // repeat record k
+    mergeTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
+    // repeat record b
     mergeTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
-    // insert new record
+    // insert new record n
     mergeTable.upsert(getRecord(new Object[]{"n", 14, 140d}, new Object[]{10d, 1400d}));
     Assert.assertEquals(mergeTable.size(), 4);
+    mergeTable.finish(false);
 
     // merge with table
     indexedTable.merge(mergeTable);
-    Assert.assertEquals(indexedTable.size(), 11);
-
-    // check survivors, a and j should be evicted
-    checkSurvivors(indexedTable, "c", "j");
+    Assert.assertEquals(indexedTable.size(), 5);
 
-    // check aggregations
-    checkAggregations(indexedTable, 20d);
+    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{100d, 800d}));
+    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{50d, 900d}));
+    indexedTable.upsert(getRecord(new Object[]{"n", 14, 140d}, new Object[]{600d, 1400d}));
 
     // finish
-    indexedTable.finish();
-    Assert.assertEquals(indexedTable.size(), 10);
+    indexedTable.finish(false);
+    checkEvicted(indexedTable, "a", "c", "d", "e", "b", "j", "k", "f", "g");
+    Assert.assertEquals(indexedTable.size(), 5);
   }
 
-  private void checkAggregations(Table indexedTable, double... evicted) {
-    Iterator<Record> iterator = indexedTable.iterator();
-    Set<Double> actualAgg = new HashSet<>();
-    while (iterator.hasNext()) {
-      actualAgg.add((double) iterator.next().getValues()[0]);
-    }
-    for (double d : evicted) {
-      Assert.assertFalse(actualAgg.contains(d));
-    }
-  }
-
-  private void checkSurvivors(Table indexedTable, String... evicted) {
+  private void checkEvicted(Table indexedTable, String... evicted) {
     Iterator<Record> iterator = indexedTable.iterator();
     List<String> d1 = new ArrayList<>();
     while (iterator.hasNext()) {
@@ -272,4 +262,64 @@ public class IndexedTableTest {
   private Record getRecord(Object[] keys, Object[] values) {
     return new Record(new Key(keys), values);
   }
+
+  @Test
+  public void testNoMoreNewRecords() {
+    DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"},
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
+            ColumnDataType.DOUBLE});
+
+    AggregationInfo agg1 = new AggregationInfo();
+    Map<String, String> params1 = new HashMap<>();
+    params1.put("column", "m1");
+    agg1.setAggregationParams(params1);
+    agg1.setAggregationType("sum");
+    AggregationInfo agg2 = new AggregationInfo();
+    Map<String, String> params2 = new HashMap<>();
+    params2.put("column", "m2");
+    agg2.setAggregationParams(params2);
+    agg2.setAggregationType("max");
+    List<AggregationInfo> aggregationInfos = Lists.newArrayList(agg1, agg2);
+
+    IndexedTable indexedTable = new SimpleIndexedTable();
+    indexedTable.init(dataSchema, aggregationInfos, null, 5);
+    testNoMoreNewRecordsInTable(indexedTable);
+
+    indexedTable = new ConcurrentIndexedTable();
+    indexedTable.init(dataSchema, aggregationInfos, null, 5);
+    testNoMoreNewRecordsInTable(indexedTable);
+  }
+
+  private void testNoMoreNewRecordsInTable(IndexedTable indexedTable) {
+    // Insert 14 records. Check that last 2 never made it.
+    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
+    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
+    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
+    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
+    Assert.assertEquals(indexedTable.size(), 2);
+
+    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
+    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
+    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
+    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
+    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
+    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{10d, 800d}));
+    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{10d, 900d}));
+    Assert.assertEquals(indexedTable.size(), 9);
+
+    indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
+    // no resize. no more records allowed
+    indexedTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
+    indexedTable.upsert(getRecord(new Object[]{"l", 12, 120d}, new Object[]{10d, 1200d}));
+    Assert.assertEquals(indexedTable.size(), 10);
+
+    // existing row allowed
+    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
+    Assert.assertEquals(indexedTable.size(), 10);
+
+    indexedTable.finish(false);
+
+    checkEvicted(indexedTable, "k", "l");
+
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
index e94a913..59551b9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/BaseQueriesTest.java
@@ -84,7 +84,22 @@ public abstract class BaseQueriesTest {
    * @return broker response.
    */
   protected BrokerResponseNative getBrokerResponseForQuery(String query, PlanMaker planMaker) {
+    return getBrokerResponseForQuery(query, planMaker, null);
+  }
+
+  /**
+   * Run query on multiple index segments with custom plan maker and queryOptions.
+   * <p>Use this to test the whole flow from server to broker.
+   * <p>The result should be equivalent to querying 4 identical index segments.
+   *
+   * @param query PQL query.
+   * @param planMaker Plan maker.
+   * @return broker response.
+   */
+  private BrokerResponseNative getBrokerResponseForQuery(String query, PlanMaker planMaker,
+      Map<String, String> queryOptions) {
     BrokerRequest brokerRequest = COMPILER.compileToBrokerRequest(query);
+    brokerRequest.setQueryOptions(queryOptions);
 
     // Server side.
     Plan plan = planMaker.makeInterSegmentPlan(getSegmentDataManagers(), brokerRequest, EXECUTOR_SERVICE, 10_000);
@@ -111,6 +126,18 @@ public abstract class BaseQueriesTest {
   }
 
   /**
+   * Run query on multiple index segments.
+   * <p>Use this to test the whole flow from server to broker.
+   * <p>The result should be equivalent to querying 4 identical index segments.
+   *
+   * @param query PQL query.
+   * @return broker response.
+   */
+  protected BrokerResponseNative getBrokerResponseForQuery(String query, Map<String, String> queryOptions) {
+    return getBrokerResponseForQuery(query, PLAN_MAKER, queryOptions);
+  }
+
+  /**
    * Run query with hard-coded filter on multiple index segments.
    * <p>Use this to test the whole flow from server to broker.
    * <p>The result should be equivalent to querying 4 identical index segments.
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
index 307e992..df5c559 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionMultiValueQueriesTest.java
@@ -49,7 +49,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 10);
@@ -69,7 +69,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 10);
     Assert.assertTrue(columnIndexMap.containsKey("column1"));
@@ -93,7 +93,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 100L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 10);
@@ -118,7 +118,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 79L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 100L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 10);
@@ -148,7 +148,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 3);
@@ -173,7 +173,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 79L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 3);
@@ -203,7 +203,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 400000L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 4);
@@ -228,7 +228,7 @@ public class InnerSegmentSelectionMultiValueQueriesTest extends BaseMultiValueQu
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 272276L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 62480L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 100000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 4);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
index 6201b3d..5683589 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentSelectionSingleValueQueriesTest.java
@@ -49,7 +49,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 11);
     Assert.assertTrue(columnIndexMap.containsKey("column1"));
@@ -68,7 +68,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 11);
@@ -93,7 +93,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 11);
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
@@ -118,7 +118,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 48241L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 110L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 11);
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
@@ -148,7 +148,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 3);
@@ -173,7 +173,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 48241L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 30L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
     Assert.assertEquals(selectionDataSchema.size(), 3);
     Assert.assertTrue(columnIndexMap.containsKey("column1"));
@@ -202,7 +202,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 120000L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 4);
@@ -227,7 +227,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 84134L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 24516L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(selectionDataSchema.size(), 4);
@@ -257,7 +257,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 0L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 330000L);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    DataSchema selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    DataSchema selectionDataSchema = resultsBlock.getDataSchema();
     Map<String, Integer> columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
@@ -283,7 +283,7 @@ public class InnerSegmentSelectionSingleValueQueriesTest extends BaseSingleValue
     Assert.assertEquals(executionStatistics.getNumEntriesScannedInFilter(), 84134L);
     Assert.assertEquals(executionStatistics.getNumEntriesScannedPostFilter(), 67419);
     Assert.assertEquals(executionStatistics.getNumTotalRawDocs(), 30000L);
-    selectionDataSchema = resultsBlock.getSelectionDataSchema();
+    selectionDataSchema = resultsBlock.getDataSchema();
     columnIndexMap = computeColumnNameToIndexMap(selectionDataSchema);
 
     Assert.assertEquals(getVirtualColumns(selectionDataSchema), 0);
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
new file mode 100644
index 0000000..f974c55
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderByMultiValueQueriesTest.java
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.*;
+
+
+/**
+ * Tests order by queries
+ */
+public class InterSegmentOrderByMultiValueQueriesTest extends BaseMultiValueQueriesTest {
+
+  @Test(dataProvider = "orderByDataProvider")
+  public void testAggregationOrderedGroupByResults(String query, List<Serializable[]> expectedResults,
+      long expectedNumEntriesScannedPostFilter) {
+    Map<String, String> queryOptions = new HashMap<>(2);
+    queryOptions.put(CommonConstants.Broker.Request.QueryOptionKey.GROUP_BY_MODE, SQL);
+    queryOptions.put(QueryOptionKey.RESPONSE_FORMAT, SQL);
+    BrokerResponseNative brokerResponse = getBrokerResponseForQuery(query, queryOptions);
+    QueriesTestUtils.testInterSegmentGroupByOrderByResult(brokerResponse, 400000L, 0,
+        expectedNumEntriesScannedPostFilter, 400000L, expectedResults);
+  }
+
+  /**
+   * Provides various combinations of order by.
+   * In order to calculate the expected results, the results from a group by were taken, and then ordered accordingly.
+   */
+  @DataProvider(name = "orderByDataProvider")
+  public Object[][] orderByDataProvider() {
+
+    List<Object[]> data = new ArrayList<>();
+    String query;
+    List<Serializable[]> results;
+    long numEntriesScannedPostFilter;
+
+    query = "SELECT SUMMV(column7) FROM testTable GROUP BY column3 ORDER BY column3";
+    results = Lists.newArrayList(new Serializable[]{"", 63917703269308.0}, new Serializable[]{"L", 33260235267900.0},
+        new Serializable[]{"P", 212961658305696.0}, new Serializable[]{"PbQd", 2001454759004.0},
+        new Serializable[]{"w", 116831822080776.0});
+    numEntriesScannedPostFilter = 800000;
+    data.add(new Object[]{query, results, numEntriesScannedPostFilter});
+
+    query = "SELECT SUMMV(column7) FROM testTable GROUP BY column5 ORDER BY column5 DESC TOP 4";
+    results = Lists.newArrayList(new Serializable[]{"yQkJTLOQoOqqhkAClgC", 61100215182228.00000},
+        new Serializable[]{"mhoVvrJm", 5806796153884.00000},
+        new Serializable[]{"kCMyNVGCASKYDdQbftOPaqVMWc", 51891832239248.00000},
+        new Serializable[]{"PbQd", 36532997335388.00000});
+    numEntriesScannedPostFilter = 800000;
+    data.add(new Object[]{query, results, numEntriesScannedPostFilter});
+
+    query = "SELECT SUMMV(column7) FROM testTable GROUP BY column5 ORDER BY SUMMV(column7) TOP 5";
+    results = Lists.newArrayList(new Serializable[]{"NCoFku", 489626381288.00000},
+        new Serializable[]{"mhoVvrJm", 5806796153884.00000},
+        new Serializable[]{"JXRmGakTYafZFPm", 18408231081808.00000}, new Serializable[]{"PbQd", 36532997335388.00000},
+        new Serializable[]{"OKyOqU", 51067166589176.00000});
+    numEntriesScannedPostFilter = 800000;
+    data.add(new Object[]{query, results, numEntriesScannedPostFilter});
+
+    // object type aggregations
+    query = "SELECT MINMAXRANGEMV(column7) FROM testTable GROUP BY column5 ORDER BY column5";
+    results = Lists.newArrayList(new Serializable[]{"AKXcXcIqsqOJFsdwxZ", 2147483446.00000},
+        new Serializable[]{"EOFxevm", 2147483446.00000}, new Serializable[]{"JXRmGakTYafZFPm", 2147483443.00000},
+        new Serializable[]{"NCoFku", 2147483436.00000}, new Serializable[]{"OKyOqU", 2147483443.00000},
+        new Serializable[]{"PbQd", 2147483443.00000},
+        new Serializable[]{"kCMyNVGCASKYDdQbftOPaqVMWc", 2147483446.00000},
+        new Serializable[]{"mhoVvrJm", 2147483438.00000}, new Serializable[]{"yQkJTLOQoOqqhkAClgC", 2147483446.00000});
+    numEntriesScannedPostFilter = 800000;
+    data.add(new Object[]{query, results, numEntriesScannedPostFilter});
+
+    // object type aggregations
+    query =
+        "SELECT MINMAXRANGEMV(column7) FROM testTable GROUP BY column5 ORDER BY MINMAXRANGEMV(column7), column5 desc";
+    results = Lists.newArrayList(new Serializable[]{"NCoFku", 2147483436.00000},
+        new Serializable[]{"mhoVvrJm", 2147483438.00000}, new Serializable[]{"PbQd", 2147483443.00000},
+        new Serializable[]{"OKyOqU", 2147483443.00000}, new Serializable[]{"JXRmGakTYafZFPm", 2147483443.00000},
+        new Serializable[]{"yQkJTLOQoOqqhkAClgC", 2147483446.00000},
+        new Serializable[]{"kCMyNVGCASKYDdQbftOPaqVMWc", 2147483446.00000},
+        new Serializable[]{"EOFxevm", 2147483446.00000}, new Serializable[]{"AKXcXcIqsqOJFsdwxZ", 2147483446.00000});
+    numEntriesScannedPostFilter = 800000;
+    data.add(new Object[]{query, results, numEntriesScannedPostFilter});
+
+    // object type aggregations - non comparable intermediate results
+    query = "SELECT DISTINCTCOUNTMV(column7) FROM testTable GROUP BY column5 ORDER BY DISTINCTCOUNTMV(column7) top 5";
+    results = Lists.newArrayList(new Serializable[]{"NCoFku", 26}, new Serializable[]{"mhoVvrJm", 65},
+        new Serializable[]{"JXRmGakTYafZFPm", 126}, new Serializable[]{"PbQd", 211}, new Serializable[]{"OKyOqU", 216});
+    numEntriesScannedPostFilter = 800000;
+    data.add(new Object[]{query, results, numEntriesScannedPostFilter});
+
+    return data.toArray(new Object[data.size()][]);
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
new file mode 100644
index 0000000..3d2cc72
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InterSegmentOrderBySingleValueQueriesTest.java
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.common.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.*;
+
+
+/**
+ * Tests order by queries
+ */
+public class InterSegmentOrderBySingleValueQueriesTest extends BaseSingleValueQueriesTest {
+
+  @Test(dataProvider = "orderByResultTableProvider")
+  public void testGroupByOrderBy(String query, List<Serializable[]> expectedResults, long expectedNumDocsScanned,
+      long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs) {
+    Map<String, String> queryOptions = new HashMap<>(2);
+    queryOptions.put(QueryOptionKey.GROUP_BY_MODE, SQL);
+    queryOptions.put(QueryOptionKey.RESPONSE_FORMAT, SQL);
+    BrokerResponseNative brokerResponse = getBrokerResponseForQuery(query, queryOptions);
+    QueriesTestUtils.testInterSegmentGroupByOrderByResult(brokerResponse, expectedNumDocsScanned,
+        expectedNumEntriesScannedInFilter, expectedNumEntriesScannedPostFilter, expectedNumTotalDocs, expectedResults);
+  }
+
+  /**
+   * Tests the query options for groupByMode, responseFormat.
+   * pql, pql - does not execute order by, returns aggregationResults
+   * pql, sql - does not execute order by, returns aggregationResults
+   * sql, pql - executes order by, but returns aggregationResults. Keys across all aggregations will be same
+   * sql, sql - executes order by, returns resultsTable
+   */
+  @Test
+  public void testQueryOptions() {
+    String query = "SELECT SUM(column1), MIN(column6) FROM testTable GROUP BY column11 ORDER BY column11";
+    Map<String, String> queryOptions = null;
+
+    // default PQL, PQL
+    BrokerResponseNative brokerResponse = getBrokerResponseForQuery(query, queryOptions);
+    Assert.assertNotNull(brokerResponse.getAggregationResults());
+    Assert.assertNull(brokerResponse.getResultTable());
+
+    // PQL, PQL - don't execute order by, return aggregationResults
+    queryOptions = new HashMap<>(2);
+    queryOptions.put(QueryOptionKey.GROUP_BY_MODE, PQL);
+    queryOptions.put(QueryOptionKey.RESPONSE_FORMAT, PQL);
+    brokerResponse = getBrokerResponseForQuery(query, queryOptions);
+    Assert.assertNotNull(brokerResponse.getAggregationResults());
+    Assert.assertNull(brokerResponse.getResultTable());
+
+    // PQL, SQL - don't execute order by, return aggregationResults.
+    queryOptions.put(QueryOptionKey.GROUP_BY_MODE, PQL);
+    queryOptions.put(QueryOptionKey.RESPONSE_FORMAT, SQL);
+    brokerResponse = getBrokerResponseForQuery(query, queryOptions);
+    Assert.assertNotNull(brokerResponse.getAggregationResults());
+    Assert.assertNull(brokerResponse.getResultTable());
+
+    // SQL, PQL - execute the order by, but return aggregationResults. Keys should be same across aggregation functions.
+    queryOptions.put(QueryOptionKey.GROUP_BY_MODE, SQL);
+    queryOptions.put(QueryOptionKey.RESPONSE_FORMAT, PQL);
+    brokerResponse = getBrokerResponseForQuery(query, queryOptions);
+    Assert.assertNotNull(brokerResponse.getAggregationResults());
+    Assert.assertNull(brokerResponse.getResultTable());
+    List<AggregationResult> aggregationResults = brokerResponse.getAggregationResults();
+    Assert.assertEquals(aggregationResults.size(), 2);
+    Iterator<GroupByResult> it1 = aggregationResults.get(0).getGroupByResult().iterator();
+    Iterator<GroupByResult> it2 = aggregationResults.get(1).getGroupByResult().iterator();
+    while (it1.hasNext() && it2.hasNext()) {
+      GroupByResult groupByResult1 = it1.next();
+      GroupByResult groupByResult2 = it2.next();
+      Assert.assertEquals(groupByResult1.getGroup(), groupByResult2.getGroup());
+    }
+
+    // SQL, SQL - execute order by, return resultsTable
+    queryOptions.put(QueryOptionKey.GROUP_BY_MODE, SQL);
+    queryOptions.put(QueryOptionKey.RESPONSE_FORMAT, SQL);
+    brokerResponse = getBrokerResponseForQuery(query, queryOptions);
+    Assert.assertNull(brokerResponse.getAggregationResults());
+    Assert.assertNotNull(brokerResponse.getResultTable());
+  }
+
+  /**
+   * Provides various combinations of order by in ResultTable.
+   * In order to calculate the expected results, the results from a group by were taken, and then ordered accordingly.
+   */
+  @DataProvider(name = "orderByResultTableProvider")
+  public Object[][] orderByResultTableProvider() {
+
+    List<Object[]> data = new ArrayList<>();
+    String query;
+    List<Serializable[]> results;
+    long numDocsScanned = 120000;
+    long numEntriesScannedInFilter = 0;
+    long numEntriesScannedPostFilter;
+    long numTotalDocs = 120000;
+
+    // order by one of the group by columns
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11 ORDER BY column11";
+    results = Lists.newArrayList(new Serializable[]{"", 5935285005452.0}, new Serializable[]{"P", 88832999206836.0},
+        new Serializable[]{"gFuH", 63202785888.0}, new Serializable[]{"o", 18105331533948.0},
+        new Serializable[]{"t", 16331923219264.0});
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // order by one of the group by columns DESC
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11 ORDER BY column11 DESC";
+    results = Lists.newArrayList(results);
+    Collections.reverse(results);
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // order by one of the group by columns, TOP less than default
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11 ORDER BY column11 TOP 3";
+    results = Lists.newArrayList(results);
+    Collections.reverse(results);
+    results = results.subList(0, 3);
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // group by 2 dimensions, order by both, tie breaker
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY column11, column12";
+    results = Lists.newArrayList(new Serializable[]{"", "HEuxNvH", 3789390396216.0},
+        new Serializable[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.00000},
+        new Serializable[]{"", "MaztCmmxxgguBUxPti", 1333941430664.00000},
+        new Serializable[]{"", "dJWwFk", 55470665124.0000},
+        new Serializable[]{"", "oZgnrlDEtjjVpUoFLol", 22680162504.00000},
+        new Serializable[]{"P", "HEuxNvH", 21998672845052.00000},
+        new Serializable[]{"P", "KrNxpdycSiwoRohEiTIlLqDHnx", 18069909216728.00000},
+        new Serializable[]{"P", "MaztCmmxxgguBUxPti", 27177029040008.00000},
+        new Serializable[]{"P", "TTltMtFiRqUjvOG", 4462670055540.00000},
+        new Serializable[]{"P", "XcBNHe", 120021767504.00000});
+    numEntriesScannedPostFilter = 360000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // group by 2 columns, order by both, TOP more than default
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY column11, column12 TOP 15";
+    results = Lists.newArrayList(results);
+    results.add(new Serializable[]{"P", "dJWwFk", 6224665921376.00000});
+    results.add(new Serializable[]{"P", "fykKFqiw", 1574451324140.00000});
+    results.add(new Serializable[]{"P", "gFuH", 860077643636.00000});
+    results.add(new Serializable[]{"P", "oZgnrlDEtjjVpUoFLol", 8345501392852.00000});
+    results.add(new Serializable[]{"gFuH", "HEuxNvH", 29872400856.00000});
+    numEntriesScannedPostFilter = 360000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // group by 2 columns, order by both, one of them DESC
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY column11, column12 DESC";
+    results = Lists.newArrayList(new Serializable[]{"", "oZgnrlDEtjjVpUoFLol", 22680162504.00000},
+        new Serializable[]{"", "dJWwFk", 55470665124.0000},
+        new Serializable[]{"", "MaztCmmxxgguBUxPti", 1333941430664.00000},
+        new Serializable[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.00000},
+        new Serializable[]{"", "HEuxNvH", 3789390396216.00000},
+        new Serializable[]{"P", "oZgnrlDEtjjVpUoFLol", 8345501392852.00000},
+        new Serializable[]{"P", "gFuH", 860077643636.00000}, new Serializable[]{"P", "fykKFqiw", 1574451324140.00000},
+        new Serializable[]{"P", "dJWwFk", 6224665921376.00000}, new Serializable[]{"P", "XcBNHe", 120021767504.00000});
+    numEntriesScannedPostFilter = 360000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // order by group by column and an aggregation
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY column11, SUM(column1)";
+    results = Lists.newArrayList(new Serializable[]{"", "oZgnrlDEtjjVpUoFLol", 22680162504.00000},
+        new Serializable[]{"", "dJWwFk", 55470665124.0000},
+        new Serializable[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.00000},
+        new Serializable[]{"", "MaztCmmxxgguBUxPti", 1333941430664.00000},
+        new Serializable[]{"", "HEuxNvH", 3789390396216.00000}, new Serializable[]{"P", "XcBNHe", 120021767504.00000},
+        new Serializable[]{"P", "gFuH", 860077643636.00000}, new Serializable[]{"P", "fykKFqiw", 1574451324140.00000},
+        new Serializable[]{"P", "TTltMtFiRqUjvOG", 4462670055540.00000},
+        new Serializable[]{"P", "dJWwFk", 6224665921376.00000});
+    numEntriesScannedPostFilter = 360000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // order by only aggregation, DESC, TOP
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY SUM(column1) DESC TOP 50";
+    results = Lists.newArrayList(new Serializable[]{"P", "MaztCmmxxgguBUxPti", 27177029040008.00000},
+        new Serializable[]{"P", "HEuxNvH", 21998672845052.00000},
+        new Serializable[]{"P", "KrNxpdycSiwoRohEiTIlLqDHnx", 18069909216728.00000},
+        new Serializable[]{"P", "oZgnrlDEtjjVpUoFLol", 8345501392852.00000},
+        new Serializable[]{"o", "MaztCmmxxgguBUxPti", 6905624581072.00000},
+        new Serializable[]{"P", "dJWwFk", 6224665921376.00000}, new Serializable[]{"o", "HEuxNvH", 5026384681784.00000},
+        new Serializable[]{"t", "MaztCmmxxgguBUxPti", 4492405624940.00000},
+        new Serializable[]{"P", "TTltMtFiRqUjvOG", 4462670055540.00000},
+        new Serializable[]{"t", "HEuxNvH", 4424489490364.00000},
+        new Serializable[]{"o", "KrNxpdycSiwoRohEiTIlLqDHnx", 4051812250524.00000},
+        new Serializable[]{"", "HEuxNvH", 3789390396216.00000},
+        new Serializable[]{"t", "KrNxpdycSiwoRohEiTIlLqDHnx", 3529048341192.00000},
+        new Serializable[]{"P", "fykKFqiw", 1574451324140.00000},
+        new Serializable[]{"t", "dJWwFk", 1349058948804.00000},
+        new Serializable[]{"", "MaztCmmxxgguBUxPti", 1333941430664.00000},
+        new Serializable[]{"o", "dJWwFk", 1152689463360.00000},
+        new Serializable[]{"t", "oZgnrlDEtjjVpUoFLol", 1039101333316.00000},
+        new Serializable[]{"P", "gFuH", 860077643636.00000},
+        new Serializable[]{"", "KrNxpdycSiwoRohEiTIlLqDHnx", 733802350944.00000},
+        new Serializable[]{"o", "oZgnrlDEtjjVpUoFLol", 699381633640.00000},
+        new Serializable[]{"t", "TTltMtFiRqUjvOG", 675238030848.00000},
+        new Serializable[]{"t", "fykKFqiw", 480973878052.00000}, new Serializable[]{"t", "gFuH", 330331507792.00000},
+        new Serializable[]{"o", "TTltMtFiRqUjvOG", 203835153352.00000},
+        new Serializable[]{"P", "XcBNHe", 120021767504.00000}, new Serializable[]{"o", "fykKFqiw", 62975165296.00000},
+        new Serializable[]{"", "dJWwFk", 55470665124.0000}, new Serializable[]{"gFuH", "HEuxNvH", 29872400856.00000},
+        new Serializable[]{"gFuH", "MaztCmmxxgguBUxPti", 29170832184.00000},
+        new Serializable[]{"", "oZgnrlDEtjjVpUoFLol", 22680162504.00000},
+        new Serializable[]{"t", "XcBNHe", 11276063956.00000},
+        new Serializable[]{"gFuH", "KrNxpdycSiwoRohEiTIlLqDHnx", 4159552848.00000},
+        new Serializable[]{"o", "gFuH", 2628604920.00000});
+    numEntriesScannedPostFilter = 360000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // order by aggregation with space/tab in order by
+    query = "SELECT SUM(column1) FROM testTable GROUP BY column11, column12 ORDER BY SUM  ( column1) DESC TOP 3";
+    results = Lists.newArrayList(new Serializable[]{"P", "MaztCmmxxgguBUxPti", 27177029040008.00000},
+        new Serializable[]{"P", "HEuxNvH", 21998672845052.00000},
+        new Serializable[]{"P", "KrNxpdycSiwoRohEiTIlLqDHnx", 18069909216728.00000});
+    numEntriesScannedPostFilter = 360000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // order by an aggregation DESC, and group by column
+    query = "SELECT MIN(column6) FROM testTable GROUP BY column12 ORDER BY MIN(column6) DESC, column12";
+    results = Lists.newArrayList(new Serializable[]{"XcBNHe", 329467557.00000},
+        new Serializable[]{"fykKFqiw", 296467636.00000}, new Serializable[]{"gFuH", 296467636.00000},
+        new Serializable[]{"HEuxNvH", 6043515.00000}, new Serializable[]{"MaztCmmxxgguBUxPti", 6043515.00000},
+        new Serializable[]{"dJWwFk", 6043515.00000}, new Serializable[]{"KrNxpdycSiwoRohEiTIlLqDHnx", 1980174.00000},
+        new Serializable[]{"TTltMtFiRqUjvOG", 1980174.00000}, new Serializable[]{"oZgnrlDEtjjVpUoFLol", 1689277.00000});
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // numeric dimension should follow numeric ordering
+    query = "select count(*) from testTable group by column17 order by column17 top 15";
+    results = Lists.newArrayList(new Serializable[]{83386499, 2924L}, new Serializable[]{217787432, 3892L},
+        new Serializable[]{227908817, 6564L}, new Serializable[]{402773817, 7304L},
+        new Serializable[]{423049234, 6556L}, new Serializable[]{561673250, 7420L},
+        new Serializable[]{635942547, 3308L}, new Serializable[]{638936844, 3816L},
+        new Serializable[]{939479517, 3116L}, new Serializable[]{984091268, 3824L},
+        new Serializable[]{1230252339, 5620L}, new Serializable[]{1284373442, 7428L},
+        new Serializable[]{1555255521, 2900L}, new Serializable[]{1618904660, 2744L},
+        new Serializable[]{1670085862, 3388L});
+    numEntriesScannedPostFilter = 120000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // group by UDF order by UDF
+    query = "SELECT COUNT(*) FROM testTable GROUP BY sub(column1, 100000) TOP 3 ORDER BY sub(column1, 100000)";
+    results = Lists.newArrayList(new Serializable[]{140528.0, 28L}, new Serializable[]{194355.0, 12L},
+        new Serializable[]{532157.0, 12L});
+    numEntriesScannedPostFilter = 120000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // spaces in UDF
+    query = "SELECT COUNT(*) FROM testTable GROUP BY sub(column1, 100000) TOP 3 ORDER BY SUB(   column1, 100000 )";
+    results = Lists.newArrayList(new Serializable[]{140528.0, 28L}, new Serializable[]{194355.0, 12L},
+        new Serializable[]{532157.0, 12L});
+    numEntriesScannedPostFilter = 120000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // Object type aggregation - comparable intermediate results (AVG, MINMAXRANGE)
+    query = "SELECT AVG(column6) FROM testTable GROUP BY column11  ORDER BY column11";
+    results = Lists.newArrayList(new Serializable[]{"", 296467636.0}, new Serializable[]{"P", 909380310.3521485},
+        new Serializable[]{"gFuH", 296467636.0}, new Serializable[]{"o", 296467636.0},
+        new Serializable[]{"t", 526245333.3900426});
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    query = "SELECT AVG(column6) FROM testTable GROUP BY column11 ORDER BY AVG(column6), column11 DESC";
+    results = Lists.newArrayList(new Serializable[]{"o", 296467636.0}, new Serializable[]{"gFuH", 296467636.0},
+        new Serializable[]{"", 296467636.0}, new Serializable[]{"t", 526245333.3900426},
+        new Serializable[]{"P", 909380310.3521485});
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // Object type aggregation - non comparable intermediate results (DISTINCTCOUNT)
+    query = "SELECT DISTINCTCOUNT(column11) FROM testTable GROUP BY column12 ORDER BY column12";
+    results = Lists.newArrayList(new Serializable[]{"HEuxNvH", 5}, new Serializable[]{"KrNxpdycSiwoRohEiTIlLqDHnx", 5},
+        new Serializable[]{"MaztCmmxxgguBUxPti", 5}, new Serializable[]{"TTltMtFiRqUjvOG", 3},
+        new Serializable[]{"XcBNHe", 2}, new Serializable[]{"dJWwFk", 4}, new Serializable[]{"fykKFqiw", 3},
+        new Serializable[]{"gFuH", 3}, new Serializable[]{"oZgnrlDEtjjVpUoFLol", 4});
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    query =
+        "SELECT DISTINCTCOUNT(column11) FROM testTable GROUP BY column12 ORDER BY DISTINCTCOUNT(column11), column12 DESC";
+    results = Lists.newArrayList(new Serializable[]{"XcBNHe", 2}, new Serializable[]{"gFuH", 3},
+        new Serializable[]{"fykKFqiw", 3}, new Serializable[]{"TTltMtFiRqUjvOG", 3},
+        new Serializable[]{"oZgnrlDEtjjVpUoFLol", 4}, new Serializable[]{"dJWwFk", 4},
+        new Serializable[]{"MaztCmmxxgguBUxPti", 5}, new Serializable[]{"KrNxpdycSiwoRohEiTIlLqDHnx", 5},
+        new Serializable[]{"HEuxNvH", 5});
+    numEntriesScannedPostFilter = 240000;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    // empty results
+    query =
+        "SELECT MIN(column6) FROM testTable where column12='non-existent-value' GROUP BY column11 order by column11";
+    results = new ArrayList<>(0);
+    numDocsScanned = 0;
+    numEntriesScannedPostFilter = 0;
+    data.add(new Object[]{query, results, numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs});
+
+    return data.toArray(new Object[data.size()][]);
+  }
+}
\ No newline at end of file
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
index 24cb0db..fe0e656 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/QueriesTestUtils.java
@@ -19,11 +19,14 @@
 package org.apache.pinot.queries;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.function.Function;
 import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.core.operator.ExecutionStatistics;
 import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByResult;
@@ -116,4 +119,22 @@ public class QueriesTestUtils {
       }
     }
   }
+
+  public static void testInterSegmentGroupByOrderByResult(BrokerResponseNative brokerResponse, long expectedNumDocsScanned,
+      long expectedNumEntriesScannedInFilter, long expectedNumEntriesScannedPostFilter, long expectedNumTotalDocs,
+      List<Serializable[]> expectedResults) {
+    Assert.assertEquals(brokerResponse.getNumDocsScanned(), expectedNumDocsScanned);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedInFilter(), expectedNumEntriesScannedInFilter);
+    Assert.assertEquals(brokerResponse.getNumEntriesScannedPostFilter(), expectedNumEntriesScannedPostFilter);
+    Assert.assertEquals(brokerResponse.getTotalDocs(), expectedNumTotalDocs);
+
+    ResultTable resultTable = brokerResponse.getResultTable();
+    List<Serializable[]> actualResults = resultTable.getRows();
+
+    Assert.assertEquals(actualResults.size(), expectedResults.size());
+
+    for (int i = 0; i < actualResults.size(); i++) {
+      Assert.assertEquals(Arrays.asList(actualResults.get(i)), Arrays.asList(expectedResults.get(i)));
+    }
+  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/AggregationGroupByTrimmingServiceTest.java b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/AggregationGroupByTrimmingServiceTest.java
index 459a799..ce4a1b7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/AggregationGroupByTrimmingServiceTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/AggregationGroupByTrimmingServiceTest.java
@@ -33,6 +33,7 @@ import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
 import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -64,8 +65,7 @@ public class AggregationGroupByTrimmingServiceTest {
       List<String> group = new ArrayList<>(NUM_GROUP_KEYS);
       for (int i = 0; i < NUM_GROUP_KEYS; i++) {
         // Randomly generate group key without GROUP_KEY_DELIMITER
-        group.add(RandomStringUtils.random(RANDOM.nextInt(10))
-            .replace(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER, ""));
+        group.add(RandomStringUtils.random(RANDOM.nextInt(10)).replace(GroupKeyGenerator.DELIMITER, ""));
       }
       groupSet.add(buildGroupString(group));
     }
@@ -74,7 +74,7 @@ public class AggregationGroupByTrimmingServiceTest {
     // Explicitly set an empty group
     StringBuilder emptyGroupBuilder = new StringBuilder();
     for (int i = 1; i < NUM_GROUP_KEYS; i++) {
-      emptyGroupBuilder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
+      emptyGroupBuilder.append(GroupKeyGenerator.DELIMITER);
     }
     _groups.set(NUM_GROUPS - 1, emptyGroupBuilder.toString());
 
@@ -136,7 +136,7 @@ public class AggregationGroupByTrimmingServiceTest {
     StringBuilder groupStringBuilder = new StringBuilder();
     for (int i = 0; i < NUM_GROUP_KEYS; i++) {
       if (i != 0) {
-        groupStringBuilder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
+        groupStringBuilder.append(GroupKeyGenerator.DELIMITER);
       }
       groupStringBuilder.append(group.get(i));
     }
diff --git a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
index 5449694..205edb2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/query/aggregation/groupby/NoDictionaryGroupKeyGeneratorTest.java
@@ -181,7 +181,7 @@ public class NoDictionaryGroupKeyGeneratorTest {
       for (int i = 0; i < groupByColumns.length; i++) {
         stringBuilder.append(row.getValue(groupByColumns[i]));
         if (i < groupByColumns.length - 1) {
-          stringBuilder.append(AggregationGroupByTrimmingService.GROUP_KEY_DELIMITER);
+          stringBuilder.append(GroupKeyGenerator.DELIMITER);
         }
       }
       groupKeys.add(stringBuilder.toString());
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
similarity index 56%
copy from pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
copy to pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 37d8767..2338212 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -18,19 +18,25 @@
  */
 package org.apache.pinot.perf;
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
@@ -39,10 +45,15 @@ import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
 import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
+import org.apache.pinot.core.query.aggregation.groupby.GroupKeyGenerator;
+import org.apache.pinot.core.query.utils.Pair;
+import org.apache.pinot.core.util.GroupByUtils;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
 import org.openjdk.jmh.annotations.Scope;
@@ -56,40 +67,43 @@ import org.openjdk.jmh.runner.options.TimeValue;
 
 
 @State(Scope.Benchmark)
-public class BenchmarkIndexedTable {
-
-  private int CAPACITY = 800;
-  private int NUM_RECORDS = 1000;
+@Fork(value = 1, jvmArgs = {"-server", "-Xmx8G", "-XX:MaxDirectMemorySize=16G"})
+public class BenchmarkCombineGroupBy {
+
+  private static final int TOP_N = 500;
+  private static final int NUM_SEGMENTS = 4;
+  private static final int NUM_RECORDS_PER_SEGMENT = 100_000;
+  private static final int CARDINALITY_D1 = 500;
+  private static final int CARDINALITY_D2 = 500;
   private Random _random = new Random();
 
   private DataSchema _dataSchema;
   private List<AggregationInfo> _aggregationInfos;
+  private AggregationFunction[] _aggregationFunctions;
   private List<SelectionSort> _orderBy;
+  private int _numAggregationFunctions;
 
   private List<String> _d1;
   private List<Integer> _d2;
 
   private ExecutorService _executorService;
 
-  @Setup(Level.Invocation)
-  public void setUpInvocation() {
+  @Setup
+  public void setup() {
 
     // create data
-    int cardinalityD1 = 100;
-    _d1 = new ArrayList<>(cardinalityD1);
-    for (int i = 0; i < cardinalityD1; i++) {
-      _d1.add(RandomStringUtils.randomAlphabetic(3));
+    Set<String> d1 = new HashSet<>(CARDINALITY_D1);
+    while (d1.size() < CARDINALITY_D1) {
+      d1.add(RandomStringUtils.randomAlphabetic(3));
     }
+    _d1 = new ArrayList<>(CARDINALITY_D1);
+    _d1.addAll(d1);
 
-    int cardinalityD2 = 100;
-    _d2 = new ArrayList<>(cardinalityD2);
-    for (int i = 0; i < cardinalityD2; i++) {
+    _d2 = new ArrayList<>(CARDINALITY_D2);
+    for (int i = 0; i < CARDINALITY_D2; i++) {
       _d2.add(i);
     }
-  }
 
-  @Setup
-  public void setup() {
     _dataSchema = new DataSchema(new String[]{"d1", "d2", "sum(m1)", "max(m2)"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
@@ -106,6 +120,12 @@ public class BenchmarkIndexedTable {
     agg2.setAggregationType("max");
     _aggregationInfos = Lists.newArrayList(agg1, agg2);
 
+    _numAggregationFunctions = 2;
+    _aggregationFunctions = new AggregationFunction[_numAggregationFunctions];
+    for (int i = 0; i < _numAggregationFunctions; i++) {
+      _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(_aggregationInfos.get(i), null);
+    }
+
     SelectionSort orderBy = new SelectionSort();
     orderBy.setColumn("sum(m1)");
     orderBy.setIsAsc(true);
@@ -119,32 +139,40 @@ public class BenchmarkIndexedTable {
     _executorService.shutdown();
   }
 
-  private Record getNewRecord() {
+  private Record getRecord() {
     Object[] keys = new Object[]{_d1.get(_random.nextInt(_d1.size())), _d2.get(_random.nextInt(_d2.size()))};
     Object[] values = new Object[]{(double) _random.nextInt(1000), (double) _random.nextInt(1000)};
     return new Record(new Key(keys), values);
   }
 
+  private Pair<String, Object[]> getOriginalRecord() {
+    String stringKey = Joiner.on(GroupKeyGenerator.DELIMITER)
+        .join(_d1.get(_random.nextInt(_d1.size())), _d2.get(_random.nextInt(_d2.size())));
+    Object[] values = new Object[]{(double) _random.nextInt(1000), (double) _random.nextInt(1000)};
+    return new Pair<>(stringKey, values);
+  }
+
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void concurrentIndexedTable() throws InterruptedException, ExecutionException, TimeoutException {
+  public void concurrentIndexedTableForCombineGroupBy() throws InterruptedException, ExecutionException, TimeoutException {
 
-    int numSegments = 10;
+    int capacity = 200_000;//GroupByUtils.getTableCapacity(TOP_N);
 
     // make 1 concurrent table
     IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
-    concurrentIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
+    concurrentIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, capacity);
 
-    List<Callable<Void>> innerSegmentCallables = new ArrayList<>(numSegments);
+    List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
 
     // 10 parallel threads putting 10k records into the table
 
-    for (int i = 0; i < numSegments; i++) {
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
 
       Callable<Void> callable = () -> {
-        for (int r = 0; r < NUM_RECORDS; r++) {
-          concurrentIndexedTable.upsert(getNewRecord());
+
+        for (int r = 0; r < NUM_RECORDS_PER_SEGMENT; r++) {
+          concurrentIndexedTable.upsert(getRecord());
         }
         return null;
       };
@@ -153,36 +181,47 @@ public class BenchmarkIndexedTable {
 
     List<Future<Void>> futures = _executorService.invokeAll(innerSegmentCallables);
     for (Future<Void> future : futures) {
-      future.get(10, TimeUnit.SECONDS);
+      future.get(30, TimeUnit.SECONDS);
     }
 
-    concurrentIndexedTable.finish();
+    concurrentIndexedTable.finish(false);
   }
 
 
   @Benchmark
   @BenchmarkMode(Mode.AverageTime)
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
-  public void simpleIndexedTable() throws InterruptedException, TimeoutException, ExecutionException {
+  public void originalCombineGroupBy() throws InterruptedException, TimeoutException, ExecutionException {
 
-    int numSegments = 10;
+    AtomicInteger numGroups = new AtomicInteger();
+    int _interSegmentNumGroupsLimit = 200_000;
 
-    List<IndexedTable> simpleIndexedTables = new ArrayList<>(numSegments);
-    List<Callable<Void>> innerSegmentCallables = new ArrayList<>(numSegments);
-
-    for (int i = 0; i < numSegments; i++) {
-
-      // make 10 indexed tables
-      IndexedTable simpleIndexedTable = new SimpleIndexedTable();
-      simpleIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
-      simpleIndexedTables.add(simpleIndexedTable);
-
-      // put 10k records in each indexed table, in parallel
+    ConcurrentMap<String, Object[]> resultsMap = new ConcurrentHashMap<>();
+    List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
+    for (int i = 0; i < NUM_SEGMENTS; i++) {
       Callable<Void> callable = () -> {
-        for (int r = 0; r < NUM_RECORDS; r++) {
-          simpleIndexedTable.upsert(getNewRecord());
+        for (int r = 0; r < NUM_RECORDS_PER_SEGMENT; r++) {
+
+          Pair<String, Object[]> newRecordOriginal = getOriginalRecord();
+          String stringKey = newRecordOriginal.getFirst();
+          final Object[] value = newRecordOriginal.getSecond();
+
+          resultsMap.compute(stringKey, (k, v) -> {
+            if (v == null) {
+              if (numGroups.getAndIncrement() < _interSegmentNumGroupsLimit) {
+                v = new Object[_numAggregationFunctions];
+                for (int j = 0; j < _numAggregationFunctions; j++) {
+                  v[j] = value[j];
+                }
+              }
+            } else {
+              for (int j = 0; j < _numAggregationFunctions; j++) {
+                v[j] = _aggregationFunctions[j].merge(v[j], value[j]);
+              }
+            }
+            return v;
+          });
         }
-        simpleIndexedTable.finish();
         return null;
       };
       innerSegmentCallables.add(callable);
@@ -190,23 +229,16 @@ public class BenchmarkIndexedTable {
 
     List<Future<Void>> futures = _executorService.invokeAll(innerSegmentCallables);
     for (Future<Void> future : futures) {
-      future.get(10, TimeUnit.SECONDS);
+      future.get(30, TimeUnit.SECONDS);
     }
 
-    // merge all indexed tables into 1
-    IndexedTable mergedTable = null;
-    for (IndexedTable indexedTable : simpleIndexedTables) {
-      if (mergedTable == null) {
-        mergedTable = indexedTable;
-      } else {
-        mergedTable.merge(indexedTable);
-      }
-    }
-    mergedTable.finish();
+    AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
+        new AggregationGroupByTrimmingService(_aggregationFunctions, TOP_N);
+    List<Map<String, Object>> trimmedResults = aggregationGroupByTrimmingService.trimIntermediateResultsMap(resultsMap);
   }
 
   public static void main(String[] args) throws Exception {
-    ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkIndexedTable.class.getSimpleName())
+    ChainedOptionsBuilder opt = new OptionsBuilder().include(BenchmarkCombineGroupBy.class.getSimpleName())
         .warmupTime(TimeValue.seconds(10))
         .warmupIterations(1)
         .measurementTime(TimeValue.seconds(30))
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 37d8767..5212f3d 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -21,10 +21,13 @@ package org.apache.pinot.perf;
 import com.google.common.collect.Lists;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -40,9 +43,9 @@ import org.apache.pinot.core.data.table.IndexedTable;
 import org.apache.pinot.core.data.table.Key;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.util.trace.TraceRunnable;
 import org.openjdk.jmh.annotations.Benchmark;
 import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Level;
 import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
 import org.openjdk.jmh.annotations.Scope;
@@ -71,25 +74,24 @@ public class BenchmarkIndexedTable {
 
   private ExecutorService _executorService;
 
-  @Setup(Level.Invocation)
-  public void setUpInvocation() {
 
+  @Setup
+  public void setup() {
     // create data
     int cardinalityD1 = 100;
-    _d1 = new ArrayList<>(cardinalityD1);
-    for (int i = 0; i < cardinalityD1; i++) {
-      _d1.add(RandomStringUtils.randomAlphabetic(3));
+    Set<String> d1 = new HashSet<>(cardinalityD1);
+    while (d1.size() < cardinalityD1) {
+      d1.add(RandomStringUtils.randomAlphabetic(3));
     }
+    _d1 = new ArrayList<>(cardinalityD1);
+    _d1.addAll(d1);
 
     int cardinalityD2 = 100;
     _d2 = new ArrayList<>(cardinalityD2);
     for (int i = 0; i < cardinalityD2; i++) {
       _d2.add(i);
     }
-  }
 
-  @Setup
-  public void setup() {
     _dataSchema = new DataSchema(new String[]{"d1", "d2", "sum(m1)", "max(m2)"},
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT,
             DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE});
@@ -136,27 +138,39 @@ public class BenchmarkIndexedTable {
     IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
     concurrentIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
 
-    List<Callable<Void>> innerSegmentCallables = new ArrayList<>(numSegments);
-
     // 10 parallel threads putting 10k records into the table
 
+    CountDownLatch operatorLatch = new CountDownLatch(numSegments);
+    Future[] futures = new Future[numSegments];
     for (int i = 0; i < numSegments; i++) {
-
-      Callable<Void> callable = () -> {
-        for (int r = 0; r < NUM_RECORDS; r++) {
-          concurrentIndexedTable.upsert(getNewRecord());
+      futures[i] = _executorService.submit(new TraceRunnable() {
+        @SuppressWarnings("unchecked")
+        @Override
+        public void runJob() {
+          for (int r = 0; r < NUM_RECORDS; r++) {
+            concurrentIndexedTable.upsert(getNewRecord());
+          }
+          operatorLatch.countDown();
         }
-        return null;
-      };
-      innerSegmentCallables.add(callable);
+      });
     }
 
-    List<Future<Void>> futures = _executorService.invokeAll(innerSegmentCallables);
-    for (Future<Void> future : futures) {
-      future.get(10, TimeUnit.SECONDS);
+    try {
+      boolean opCompleted = operatorLatch.await(30, TimeUnit.SECONDS);
+      if (!opCompleted) {
+        System.out.println("Timed out............");
+      }
+      concurrentIndexedTable.finish(false);
+    } catch (Exception e) {
+      throw e;
+    } finally {
+      // Cancel all ongoing jobs
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
     }
-
-    concurrentIndexedTable.finish();
   }
 
 
@@ -182,7 +196,7 @@ public class BenchmarkIndexedTable {
         for (int r = 0; r < NUM_RECORDS; r++) {
           simpleIndexedTable.upsert(getNewRecord());
         }
-        simpleIndexedTable.finish();
+        simpleIndexedTable.finish(false);
         return null;
       };
       innerSegmentCallables.add(callable);
@@ -202,7 +216,7 @@ public class BenchmarkIndexedTable {
         mergedTable.merge(indexedTable);
       }
     }
-    mergedTable.finish();
+    mergedTable.finish(false);
   }
 
   public static void main(String[] args) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org