You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/10/26 00:50:12 UTC

[incubator-pinot] branch master updated: Optimizations for IndexedTable resize (#4728)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 494ff8a  Optimizations for  IndexedTable resize (#4728)
494ff8a is described below

commit 494ff8a38080190c07f62072058d5e1275bf98cd
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Fri Oct 25 17:50:02 2019 -0700

    Optimizations for  IndexedTable resize (#4728)
    
    2 optimizations introduced:
    
    Convert the Record to an IntermediateRecord before sorting/putting into PQ. IntermediateRecord will contain only the order by columns, in the right sequence.
    When converting to IntermediateRecord, extract final result if intermediate result is non-comparable
    Also made the logic for Simple and Concurrent table very identical (minus the Concurrent data structures used in ConcurrentIndexedTable)
    
    Removed the OrderByUtils in favor of IndexedTableResizer.
---
 .../apache/pinot/core/data/order/OrderByUtils.java | 328 ----------------
 .../core/data/table/ConcurrentIndexedTable.java    | 157 +++-----
 .../apache/pinot/core/data/table/IndexedTable.java |  73 ++--
 .../pinot/core/data/table/IndexedTableResizer.java | 303 ++++++++++++++
 .../pinot/core/data/table/SimpleIndexedTable.java  | 149 ++++---
 .../org/apache/pinot/core/data/table/Table.java    |  18 +-
 .../operator/CombineGroupByOrderByOperator.java    |  11 +-
 .../operator/blocks/IntermediateResultsBlock.java  |   7 +-
 .../query/AggregationGroupByOrderByOperator.java   |   4 +
 .../groupby/AggregationGroupByTrimmingService.java |   3 +-
 .../core/query/reduce/BrokerReduceService.java     |  12 +-
 .../org/apache/pinot/core/util/GroupByUtils.java   |  21 +-
 .../pinot/core/data/order/OrderByUtilsTest.java    | 265 -------------
 .../core/data/table/IndexedTableResizerTest.java   | 434 +++++++++++++++++++++
 .../pinot/core/data/table/IndexedTableTest.java    | 221 ++++++-----
 .../apache/pinot/perf/BenchmarkCombineGroupBy.java |  12 +-
 .../apache/pinot/perf/BenchmarkIndexedTable.java   |   7 +-
 17 files changed, 1082 insertions(+), 943 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java
deleted file mode 100644
index 36bdc5b..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/order/OrderByUtils.java
+++ /dev/null
@@ -1,328 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.order;
-
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.commons.collections.comparators.ComparableComparator;
-import org.apache.pinot.common.request.AggregationInfo;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
-import org.apache.pinot.common.utils.primitive.ByteArray;
-import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
-
-
-/**
- * Helper methods to perform order by of list of {@link Record}
- */
-public final class OrderByUtils {
-
-  private OrderByUtils() {
-  }
-
-  /**
-   * Constructs a comparator for ordering by the keys in the TableRecord::keys
-   */
-  public static Comparator<Record> getKeysComparator(DataSchema dataSchema, List<SelectionSort> orderBy) {
-
-    Map<String, Integer> columnIndexMap = new HashMap<>();
-    Map<String, ColumnDataType> columnDataTypeMap = new HashMap<>();
-    for (int i = 0; i < dataSchema.size(); i++) {
-      columnIndexMap.put(dataSchema.getColumnName(i), i);
-      columnDataTypeMap.put(dataSchema.getColumnName(i), dataSchema.getColumnDataType(i));
-    }
-
-    Comparator<Record> globalComparator = null;
-
-    for (SelectionSort orderByInfo : orderBy) {
-      String column = orderByInfo.getColumn();
-      boolean ascending = orderByInfo.isIsAsc();
-      Integer index = columnIndexMap.get(column);
-      if (index == null) {
-        throw new UnsupportedOperationException("Could not find column " + column + " in data schema");
-      }
-      ColumnDataType columnDataType = columnDataTypeMap.get(column);
-
-      Comparator<Record> comparator = getKeysComparator(ascending, index, columnDataType);
-      if (globalComparator == null) {
-        globalComparator = comparator;
-      } else {
-        globalComparator = globalComparator.thenComparing(comparator);
-      }
-    }
-    return globalComparator;
-  }
-
-  /**
-   * Constructs a comparator for ordering by the non-aggregation values in the Record::values
-   */
-  public static Comparator<Record> getValuesComparator(DataSchema dataSchema, List<SelectionSort> orderBy) {
-
-    Map<String, Integer> columnIndexMap = new HashMap<>();
-    Map<String, ColumnDataType> columnDataTypeMap = new HashMap<>();
-    for (int i = 0; i < dataSchema.size(); i++) {
-      columnIndexMap.put(dataSchema.getColumnName(i), i);
-      columnDataTypeMap.put(dataSchema.getColumnName(i), dataSchema.getColumnDataType(i));
-    }
-
-    Comparator<Record> globalComparator = null;
-
-    for (SelectionSort orderByInfo : orderBy) {
-      String column = orderByInfo.getColumn();
-      boolean ascending = orderByInfo.isIsAsc();
-      Integer index = columnIndexMap.get(column);
-      if (index == null) {
-        throw new UnsupportedOperationException("Could not find column " + column + " in data schema");
-      }
-      ColumnDataType columnDataType = columnDataTypeMap.get(column);
-
-      Comparator<Record> comparator = getValuesComparator(ascending, index, columnDataType);
-      if (globalComparator == null) {
-        globalComparator = comparator;
-      } else {
-        globalComparator = globalComparator.thenComparing(comparator);
-      }
-    }
-    return globalComparator;
-  }
-
-  /**
-   * Gets the indices from Record which have aggregations that are present in the order by
-   * @param orderBy order by information
-   * @param aggregationInfos aggregation information
-   * @return indices of aggregations in the record
-   */
-  public static int[] getAggregationIndexes(List<SelectionSort> orderBy, List<AggregationInfo> aggregationInfos) {
-    Map<String, Integer> aggregationColumnToIndex = new HashMap<>();
-    for (int i = 0; i < aggregationInfos.size(); i++) {
-      AggregationInfo aggregationInfo = aggregationInfos.get(i);
-      String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
-      aggregationColumnToIndex.put(aggregationColumn, i);
-    }
-
-    List<Integer> indexes = new ArrayList<>();
-    for (SelectionSort orderByInfo : orderBy) {
-      String column = orderByInfo.getColumn();
-
-      if (aggregationColumnToIndex.containsKey(column)) {
-        indexes.add(aggregationColumnToIndex.get(column));
-      }
-    }
-    return indexes.stream().mapToInt(i->i).toArray();
-  }
-
-  /**
-   * Constructs the comparator for ordering by a combination of keys from {@link Record::_keys}
-   * and aggregation values from {@link Record::values}
-   */
-  public static Comparator<Record> getKeysAndValuesComparator(DataSchema dataSchema, List<SelectionSort> orderBy,
-      List<AggregationInfo> aggregationInfos, boolean extractFinalResults) {
-
-    int numKeys = dataSchema.size() - aggregationInfos.size();
-    Map<String, Integer> keyIndexMap = new HashMap<>();
-    Map<String, ColumnDataType> keyColumnDataTypeMap = new HashMap<>();
-    for (int i = 0; i < numKeys; i++) {
-      keyIndexMap.put(dataSchema.getColumnName(i), i);
-      keyColumnDataTypeMap.put(dataSchema.getColumnName(i), dataSchema.getColumnDataType(i));
-    }
-
-    Map<String, Integer> aggregationColumnToIndex = new HashMap<>(aggregationInfos.size());
-    Map<String, AggregationInfo> aggregationColumnToInfo = new HashMap<>(aggregationInfos.size());
-    for (int i = 0; i < aggregationInfos.size(); i++) {
-      AggregationInfo aggregationInfo = aggregationInfos.get(i);
-      String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
-      aggregationColumnToIndex.put(aggregationColumn, i);
-      aggregationColumnToInfo.put(aggregationColumn, aggregationInfo);
-    }
-
-    Comparator<Record> globalComparator = null;
-
-    for (SelectionSort orderByInfo : orderBy) {
-      Comparator<Record> comparator;
-
-      String column = orderByInfo.getColumn();
-      boolean ascending = orderByInfo.isIsAsc();
-
-      // TODO: avoid the index computation and index lookup in the comparison.
-      // we can achieve this by making order by operate on Object[], which contains only order by fields
-      if (keyIndexMap.containsKey(column)) {
-        int index = keyIndexMap.get(column);
-        ColumnDataType columnDataType = keyColumnDataTypeMap.get(column);
-        comparator = getKeysComparator(ascending, index, columnDataType);
-      } else if (aggregationColumnToIndex.containsKey(column)) {
-        int index = aggregationColumnToIndex.get(column);
-        AggregationFunction aggregationFunction =
-            AggregationFunctionUtils.getAggregationFunctionContext(aggregationColumnToInfo.get(column))
-                .getAggregationFunction();
-        comparator = getAggregationComparator(ascending, index, aggregationFunction, extractFinalResults);
-      } else {
-        throw new UnsupportedOperationException("Could not find column " + column + " in data schema");
-      }
-
-      if (globalComparator == null) {
-        globalComparator = comparator;
-      } else {
-        globalComparator = globalComparator.thenComparing(comparator);
-      }
-    }
-    return globalComparator;
-  }
-
-  private static Comparator<Record> getKeysComparator(boolean ascending, int index, ColumnDataType columnDataType) {
-    Comparator<Record> comparator;
-    switch (columnDataType) {
-      case INT:
-        if (ascending) {
-          comparator = Comparator.comparingInt(o -> (Integer) o.getKey().getColumns()[index]);
-        } else {
-          comparator = (o1, o2) -> Integer
-              .compare((Integer) o2.getKey().getColumns()[index], (Integer) o1.getKey().getColumns()[index]);
-        }
-        break;
-      case LONG:
-        if (ascending) {
-          comparator = Comparator.comparingLong(o -> (Long) o.getKey().getColumns()[index]);
-        } else {
-          comparator =
-              (o1, o2) -> Long.compare((Long) o2.getKey().getColumns()[index], (Long) o1.getKey().getColumns()[index]);
-        }
-        break;
-      case FLOAT:
-        if (ascending) {
-          comparator = (o1, o2) -> Float
-              .compare((Float) o1.getKey().getColumns()[index], (Float) o2.getKey().getColumns()[index]);
-        } else {
-          comparator = (o1, o2) -> Float
-              .compare((Float) o2.getKey().getColumns()[index], (Float) o1.getKey().getColumns()[index]);
-        }
-        break;
-      case DOUBLE:
-        if (ascending) {
-          comparator = Comparator.comparingDouble(o -> (Double) o.getKey().getColumns()[index]);
-        } else {
-          comparator = (o1, o2) -> Double
-              .compare((Double) o2.getKey().getColumns()[index], (Double) o1.getKey().getColumns()[index]);
-        }
-        break;
-      case STRING:
-        if (ascending) {
-          comparator = Comparator.comparing(o -> (String) o.getKey().getColumns()[index]);
-        } else {
-          comparator = (o1, o2) -> ((String) o2.getKey().getColumns()[index])
-              .compareTo((String) o1.getKey().getColumns()[index]);
-        }
-        break;
-      case BYTES:
-        if (ascending) {
-          comparator = (o1, o2) -> ByteArray
-              .compare((byte[]) o1.getKey().getColumns()[index], (byte[]) o2.getKey().getColumns()[index]);
-        } else {
-          comparator = (o1, o2) -> ByteArray
-              .compare((byte[]) o2.getKey().getColumns()[index], (byte[]) o1.getKey().getColumns()[index]);
-        }
-        break;
-      default:
-        throw new IllegalStateException();
-    }
-    return comparator;
-  }
-
-  private static Comparator<Record> getValuesComparator(boolean ascending, int index, ColumnDataType columnDataType) {
-    Comparator<Record> comparator;
-    switch (columnDataType) {
-      case INT:
-        if (ascending) {
-          comparator = Comparator.comparingInt(o -> (Integer) o.getValues()[index]);
-        } else {
-          comparator = (o1, o2) -> Integer.compare((Integer) o2.getValues()[index], (Integer) o1.getValues()[index]);
-        }
-        break;
-      case LONG:
-        if (ascending) {
-          comparator = Comparator.comparingLong(o -> (Long) o.getValues()[index]);
-        } else {
-          comparator = (o1, o2) -> Long.compare((Long) o2.getValues()[index], (Long) o1.getValues()[index]);
-        }
-        break;
-      case FLOAT:
-        if (ascending) {
-          comparator = (o1, o2) -> Float.compare((Float) o1.getValues()[index], (Float) o2.getValues()[index]);
-        } else {
-          comparator = (o1, o2) -> Float.compare((Float) o2.getValues()[index], (Float) o1.getValues()[index]);
-        }
-        break;
-      case DOUBLE:
-        if (ascending) {
-          comparator = Comparator.comparingDouble(o -> (Double) o.getValues()[index]);
-        } else {
-          comparator = (o1, o2) -> Double.compare((Double) o2.getValues()[index], (Double) o1.getValues()[index]);
-        }
-        break;
-      case STRING:
-        if (ascending) {
-          comparator = Comparator.comparing(o -> (String) o.getValues()[index]);
-        } else {
-          comparator = (o1, o2) -> ((String) o2.getValues()[index]).compareTo((String) o1.getValues()[index]);
-        }
-        break;
-      case BYTES:
-        if (ascending) {
-          comparator = (o1, o2) -> ByteArray.compare((byte[]) o1.getValues()[index], (byte[]) o2.getValues()[index]);
-        } else {
-          comparator = (o1, o2) -> ByteArray.compare((byte[]) o2.getValues()[index], (byte[]) o1.getValues()[index]);
-        }
-        break;
-      default:
-        throw new IllegalStateException();
-    }
-    return comparator;
-  }
-
-  private static Comparator<Record> getAggregationComparator(boolean ascending, int index,
-      AggregationFunction aggregationFunction, boolean extractFinalResults) {
-
-    Comparator<Record> comparator;
-    if (extractFinalResults) {
-      if (ascending) {
-        comparator = (v1, v2) -> ComparableComparator.getInstance()
-            .compare(aggregationFunction.extractFinalResult(v1.getValues()[index]),
-                aggregationFunction.extractFinalResult(v2.getValues()[index]));
-      } else {
-        comparator = (v1, v2) -> ComparableComparator.getInstance()
-            .compare(aggregationFunction.extractFinalResult(v2.getValues()[index]),
-                aggregationFunction.extractFinalResult(v1.getValues()[index]));
-      }
-    } else {
-      if (ascending) {
-        comparator =
-            (v1, v2) -> ComparableComparator.getInstance().compare(v1.getValues()[index], v2.getValues()[index]);
-      } else {
-        comparator =
-            (v1, v2) -> ComparableComparator.getInstance().compare(v2.getValues()[index], v1.getValues()[index]);
-      }
-    }
-    return comparator;
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
index d1ea28f..0defec2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/ConcurrentIndexedTable.java
@@ -19,23 +19,17 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.Iterator;
 import java.util.List;
-import java.util.PriorityQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import javax.annotation.Nonnull;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.order.OrderByUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -49,11 +43,6 @@ public class ConcurrentIndexedTable extends IndexedTable {
 
   private ConcurrentMap<Key, Record> _lookupMap;
   private ReentrantReadWriteLock _readWriteLock;
-
-  private boolean _isOrderBy;
-  private Comparator<Record> _resizeOrderByComparator;
-  private Comparator<Record> _finishOrderByComparator;
-  private int[] _aggregationIndexes;
   private Iterator<Record> _iterator;
 
   private AtomicBoolean _noMoreNewRecords = new AtomicBoolean();
@@ -61,44 +50,35 @@ public class ConcurrentIndexedTable extends IndexedTable {
   private final AtomicLong _resizeTime = new AtomicLong();
 
   /**
-   * Initializes the data structures and comparators needed for this Table
+   * Initializes the data structures needed for this Table
    * @param dataSchema data schema of the record's keys and values
    * @param aggregationInfos aggregation infos for the aggregations in record's values
    * @param orderBy list of {@link SelectionSort} defining the order by
-   * @param capacity the max number of records to hold
+   * @param capacity the capacity of the table
    */
-  @Override
-  public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
+  public ConcurrentIndexedTable(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
       int capacity) {
-    super.init(dataSchema, aggregationInfos, orderBy, capacity);
+    super(dataSchema, aggregationInfos, orderBy, capacity);
 
     _lookupMap = new ConcurrentHashMap<>();
     _readWriteLock = new ReentrantReadWriteLock();
-    _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
-    if (_isOrderBy) {
-      // get indices of aggregations to extract final results upfront
-      // FIXME: at instance level, extract final results only if intermediate result is non-comparable, instead of for all aggregations
-      _aggregationIndexes = OrderByUtils.getAggregationIndexes(orderBy, aggregationInfos);
-      // resize comparator doesn't need to extract final results, because it will be done before hand when adding Records to the PQ.
-      _resizeOrderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, false);
-      // finish comparator needs to extract final results, as it cannot be done before hand. The _lookupMap will get modified if it is done before hand
-      _finishOrderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
-    }
   }
 
   /**
    * Thread safe implementation of upsert for inserting {@link Record} into {@link Table}
    */
   @Override
-  public boolean upsert(@Nonnull Record newRecord) {
+  public boolean upsert(Record newRecord) {
 
     Key key = newRecord.getKey();
     Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
 
     if (_noMoreNewRecords.get()) { // allow only existing record updates
       _lookupMap.computeIfPresent(key, (k, v) -> {
+        Object[] existingValues = v.getValues();
+        Object[] newValues = newRecord.getValues();
         for (int i = 0; i < _numAggregations; i++) {
-          v.getValues()[i] = _aggregationFunctions.get(i).merge(v.getValues()[i], newRecord.getValues()[i]);
+          existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
         }
         return v;
       });
@@ -110,8 +90,10 @@ public class ConcurrentIndexedTable extends IndexedTable {
           if (v == null) {
             return newRecord;
           } else {
+            Object[] existingValues = v.getValues();
+            Object[] newValues = newRecord.getValues();
             for (int i = 0; i < _numAggregations; i++) {
-              v.getValues()[i] = _aggregationFunctions.get(i).merge(v.getValues()[i], newRecord.getValues()[i]);
+              existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
             }
             return v;
           }
@@ -120,14 +102,14 @@ public class ConcurrentIndexedTable extends IndexedTable {
         _readWriteLock.readLock().unlock();
       }
 
-      // resize if exceeds capacity
-      if (_lookupMap.size() >= _bufferedCapacity) {
+      // resize if exceeds max capacity
+      if (_lookupMap.size() >= _maxCapacity) {
         if (_isOrderBy) {
           // reached capacity, resize
           _readWriteLock.writeLock().lock();
           try {
-            if (_lookupMap.size() >= _bufferedCapacity) {
-              resize(_maxCapacity);
+            if (_lookupMap.size() >= _maxCapacity) {
+              resize(_capacity);
             }
           } finally {
             _readWriteLock.writeLock().unlock();
@@ -142,15 +124,6 @@ public class ConcurrentIndexedTable extends IndexedTable {
   }
 
   @Override
-  public boolean merge(@Nonnull Table table) {
-    Iterator<Record> iterator = table.iterator();
-    while (iterator.hasNext()) {
-      upsert(iterator.next());
-    }
-    return true;
-  }
-
-  @Override
   public int size() {
     return _lookupMap.size();
   }
@@ -162,83 +135,51 @@ public class ConcurrentIndexedTable extends IndexedTable {
 
   private void resize(int trimToSize) {
 
-    if (_lookupMap.size() > trimToSize) {
-      long startTime = System.currentTimeMillis();
+    long startTime = System.currentTimeMillis();
 
-      if (_isOrderBy) {
-        // drop bottom
+    _indexedTableResizer.resizeRecordsMap(_lookupMap, trimToSize);
 
-        // make min heap of elements to evict
-        int heapSize = _lookupMap.size() - trimToSize;
-        PriorityQueue<Record> minHeap = new PriorityQueue<>(heapSize, _resizeOrderByComparator);
+    long endTime = System.currentTimeMillis();
+    long timeElapsed = endTime - startTime;
 
-        for (Record record : _lookupMap.values()) {
+    _numResizes.incrementAndGet();
+    _resizeTime.addAndGet(timeElapsed);
+  }
 
-          // extract final results before hand for comparisons on aggregations
-          // FIXME: at instance level, extract final results only if intermediate result is non-comparable, instead of for all aggregations
-          if (_aggregationIndexes.length > 0) {
-            Object[] values = record.getValues();
-            for (int index : _aggregationIndexes) {
-              values[index] = _aggregationFunctions.get(index).extractFinalResult(values[index]);
-            }
-          }
-          if (minHeap.size() < heapSize) {
-            minHeap.offer(record);
-          } else {
-            Record peek = minHeap.peek();
-            if (minHeap.comparator().compare(peek, record) < 0) {
-              minHeap.poll();
-              minHeap.offer(record);
-            }
-          }
-        }
+  private List<Record> resizeAndSort(int trimToSize) {
 
-        for (Record evictRecord : minHeap) {
-          _lookupMap.remove(evictRecord.getKey());
-        }
-      } else {
-        // drop randomly
-
-        int numRecordsToDrop = _lookupMap.size() - trimToSize;
-        for (Key evictKey : _lookupMap.keySet()) {
-          _lookupMap.remove(evictKey);
-          numRecordsToDrop --;
-          if (numRecordsToDrop == 0) {
-            break;
-          }
-        }
-      }
-      long endTime = System.currentTimeMillis();
-      long timeElapsed = endTime - startTime;
+    long startTime = System.currentTimeMillis();
 
-      _numResizes.incrementAndGet();
-      _resizeTime.addAndGet(timeElapsed);
-    }
+    List<Record> sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
+
+    long endTime = System.currentTimeMillis();
+    long timeElapsed = endTime - startTime;
+
+    _numResizes.incrementAndGet();
+    _resizeTime.addAndGet(timeElapsed);
+
+    return sortedRecords;
   }
 
   @Override
   public void finish(boolean sort) {
-    resize(_maxCapacity);
-    int numResizes = _numResizes.get();
-    long resizeTime = _resizeTime.get();
-    LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", numResizes, resizeTime,
-        numResizes == 0 ? 0 : resizeTime / numResizes);
-
-    _iterator = _lookupMap.values().iterator();
-    if (sort && _isOrderBy) {
-      // TODO: in this final sort, we can optimize again by extracting final results before hand.
-      // This could be done by adding another parameter to finish(sort, extractFinalResults)
-      // The caller then does not have to extract final results again, if extractFinalResults=true was passed to finish.
-      // Typically, at instance level, we will not need to sort, nor do we want final results - finish(true, true).
-      // At broker level we need to sort, and we also want final results - finish(true, true)
-      List<Record> sortedList = new ArrayList<>(_lookupMap.values());
-      sortedList.sort(_finishOrderByComparator);
-      _iterator = sortedList.iterator();
+
+    if (_isOrderBy) {
+
+      if (sort) {
+        List<Record> sortedRecords = resizeAndSort(_capacity);
+        _iterator = sortedRecords.iterator();
+      } else {
+        resize(_capacity);
+      }
+      int numResizes = _numResizes.get();
+      long resizeTime = _resizeTime.get();
+      LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", numResizes, resizeTime,
+          numResizes == 0 ? 0 : resizeTime / numResizes);
     }
-  }
 
-  @Override
-  public DataSchema getDataSchema() {
-    return _dataSchema;
+    if (_iterator == null) {
+      _iterator = _lookupMap.values().iterator();
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index ebf0711..85cbb2e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.core.data.table;
 
-import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
-import javax.annotation.Nonnull;
+import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
@@ -33,43 +33,60 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils
  */
 public abstract class IndexedTable implements Table {
 
-  List<AggregationFunction> _aggregationFunctions;
+  AggregationFunction[] _aggregationFunctions;
   int _numAggregations;
-  DataSchema _dataSchema;
+  private DataSchema _dataSchema;
 
+  // the capacity we need to trim to
+  int _capacity;
+  // the capacity with added buffer, in order to collect more records than capacity for better precision
   int _maxCapacity;
-  int _bufferedCapacity;
 
-  @Override
-  public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
+  boolean _isOrderBy;
+  IndexedTableResizer _indexedTableResizer;
+
+  /**
+   * Initializes the variables and comparators needed for the table
+   */
+  public IndexedTable(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
       int capacity) {
     _dataSchema = dataSchema;
 
     _numAggregations = aggregationInfos.size();
-    _aggregationFunctions = new ArrayList<>(_numAggregations);
-    for (AggregationInfo aggregationInfo : aggregationInfos) {
-      _aggregationFunctions.add(
-          AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction());
+    _aggregationFunctions = new AggregationFunction[_numAggregations];
+    for (int i = 0; i < _numAggregations; i++) {
+      _aggregationFunctions[i] =
+          AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(i)).getAggregationFunction();
     }
 
-    /* Factor used to add buffer to maxCapacity of the table **/
-    double bufferFactor;
-    /* Factor used to decide eviction threshold **/
-    /** The true capacity of the table is {@link IndexedTable::_bufferedCapacity},
-     * which is bufferFactor times the {@link IndexedTable::_maxCapacity}
-     *
-     * If records beyond {@link IndexedTable::_bufferedCapacity} are received,
-     * the table resize and evict bottom records, resizing it to {@link IndexedTable::_maxCapacity}
-     * The assumption here is that {@link IndexedTable::_maxCapacity} already has a buffer added by the caller (typically, we do max(top * 5, 5000))
-     */
-    if (capacity > 50000) {
-      // if max capacity is large, buffer capacity is kept smaller, so that we do not accumulate too many records for sorting/resizing
-      bufferFactor = 1.2;
+    _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
+    if (_isOrderBy) {
+      _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, orderBy);
+
+      // TODO: tune these numbers
+      // Based on the capacity and maxCapacity, the resizer will smartly choose to evict/retain recors from the PQ
+      if (capacity <= 100_000) { // Capacity is small, make a very large buffer. Make PQ of records to retain, during resize
+        _maxCapacity = 1_000_000;
+      } else { // Capacity is large, make buffer only slightly bigger. Make PQ of records to evict, during resize
+        _maxCapacity = (int) (capacity * 1.2);
+      }
     } else {
-      // if max capacity is small, buffer capacity is kept larger, so that we avoid frequent resizing
-      bufferFactor = 2.0;
+      _maxCapacity = capacity;
+    }
+    _capacity = capacity;
+  }
+
+  @Override
+  public boolean merge(Table table) {
+    Iterator<Record> iterator = table.iterator();
+    while (iterator.hasNext()) {
+      upsert(iterator.next());
     }
-    _maxCapacity = capacity;
-    _bufferedCapacity = (int) (capacity * bufferFactor);
+    return true;
+  }
+
+  @Override
+  public DataSchema getDataSchema() {
+    return _dataSchema;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTableResizer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTableResizer.java
new file mode 100644
index 0000000..72784e5
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTableResizer.java
@@ -0,0 +1,303 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import com.google.common.annotations.VisibleForTesting;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.function.Function;
+import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+
+
+/**
+ * Helper class for trimming and sorting records in the IndexedTable, based on the order by information
+ */
+class IndexedTableResizer {
+
+  private OrderByValueExtractor[] _orderByValueExtractors;
+  private Comparator<IntermediateRecord> _intermediateRecordComparator;
+  private int _numOrderBy;
+
+  IndexedTableResizer(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy) {
+
+    // NOTE: the assumption here is that the key columns will appear before the aggregation columns in the data schema
+    // This is handled in the only in the AggregationGroupByOrderByOperator for now
+
+    int numColumns = dataSchema.size();
+    int numAggregations = aggregationInfos.size();
+    int numKeyColumns = numColumns - numAggregations;
+
+    Map<String, Integer> columnIndexMap = new HashMap<>();
+    for (int i = 0; i < numColumns; i++) {
+      columnIndexMap.put(dataSchema.getColumnName(i), i);
+    }
+
+    Map<String, AggregationInfo> aggregationColumnToInfo = new HashMap<>();
+    for (AggregationInfo aggregationInfo : aggregationInfos) {
+      String aggregationColumn = AggregationFunctionUtils.getAggregationColumnName(aggregationInfo);
+      aggregationColumnToInfo.put(aggregationColumn, aggregationInfo);
+    }
+
+    _numOrderBy = orderBy.size();
+    _orderByValueExtractors = new OrderByValueExtractor[_numOrderBy];
+    Comparator[] comparators = new Comparator[_numOrderBy];
+
+    for (int i = 0; i < _numOrderBy; i++) {
+      SelectionSort selectionSort = orderBy.get(i);
+      String column = selectionSort.getColumn();
+
+      if (columnIndexMap.containsKey(column)) {
+        int index = columnIndexMap.get(column);
+        if (index < numKeyColumns) {
+          _orderByValueExtractors[i] = new KeyColumnExtractor(index);
+        } else {
+          AggregationInfo aggregationInfo = aggregationColumnToInfo.get(column);
+          AggregationFunction aggregationFunction =
+              AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfo).getAggregationFunction();
+          _orderByValueExtractors[i] = new AggregationColumnExtractor(index - numKeyColumns, aggregationFunction);
+        }
+      } else {
+        throw new IllegalStateException("Could not find column " + column + " in data schema");
+      }
+
+      comparators[i] = Comparator.naturalOrder();
+      if (!selectionSort.isIsAsc()) {
+        comparators[i] = comparators[i].reversed();
+      }
+    }
+
+    _intermediateRecordComparator = (o1, o2) -> {
+
+      for (int i = 0; i < _numOrderBy; i++) {
+        int result = comparators[i].compare(o1._values[i], o2._values[i]);
+        if (result != 0) {
+          return result;
+        }
+      }
+      return 0;
+    };
+  }
+
+  /**
+   * Constructs an IntermediateRecord from Record
+   * The IntermediateRecord::key is the same Record::key
+   * The IntermediateRecord::values contains only the order by columns, in the query's sort sequence
+   * For aggregation values in the order by, the final result is extracted if the intermediate result is non-comparable
+   */
+  @VisibleForTesting
+  IntermediateRecord getIntermediateRecord(Record record) {
+    Comparable[] intermediateRecordValues = new Comparable[_numOrderBy];
+    for (int i = 0; i < _numOrderBy; i++) {
+      intermediateRecordValues[i] = _orderByValueExtractors[i].extract(record);
+    }
+    return new IntermediateRecord(record.getKey(), intermediateRecordValues);
+  }
+
+  /**
+   * Trim recordsMap to trimToSize, based on order by information
+   * Resize only if number of records is greater than trimToSize
+   * The resizer smartly chooses to create PQ of records to evict or records to retain, based on the number of records and the number of records to evict
+   */
+  void resizeRecordsMap(Map<Key, Record> recordsMap, int trimToSize) {
+
+    int numRecordsToEvict = recordsMap.size() - trimToSize;
+
+    if (numRecordsToEvict > 0) {
+      // TODO: compare the performance of converting to IntermediateRecord vs keeping Record, in cases where we do not need to extract final results
+
+      if (numRecordsToEvict < trimToSize) { // num records to evict is smaller than num records to retain
+        // make PQ of records to evict
+        Comparator<IntermediateRecord> comparator = _intermediateRecordComparator;
+        PriorityQueue<IntermediateRecord> priorityQueue =
+            convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, comparator);
+        for (IntermediateRecord evictRecord : priorityQueue) {
+          recordsMap.remove(evictRecord._key);
+        }
+      } else { // num records to retain is smaller than num records to evict
+        // make PQ of records to retain
+        Comparator<IntermediateRecord> comparator = _intermediateRecordComparator.reversed();
+        PriorityQueue<IntermediateRecord> priorityQueue =
+            convertToIntermediateRecordsPQ(recordsMap, trimToSize, comparator);
+        ObjectOpenHashSet<Key> keysToRetain = new ObjectOpenHashSet<>(priorityQueue.size());
+        for (IntermediateRecord retainRecord : priorityQueue) {
+          keysToRetain.add(retainRecord._key);
+        }
+        recordsMap.keySet().retainAll(keysToRetain);
+      }
+    }
+  }
+
+  private PriorityQueue<IntermediateRecord> convertToIntermediateRecordsPQ(Map<Key, Record> recordsMap, int size,
+      Comparator<IntermediateRecord> comparator) {
+    PriorityQueue<IntermediateRecord> priorityQueue = new PriorityQueue<>(size, comparator);
+
+    for (Record record : recordsMap.values()) {
+
+      IntermediateRecord intermediateRecord = getIntermediateRecord(record);
+      if (priorityQueue.size() < size) {
+        priorityQueue.offer(intermediateRecord);
+      } else {
+        IntermediateRecord peek = priorityQueue.peek();
+        if (comparator.compare(peek, intermediateRecord) < 0) {
+          priorityQueue.poll();
+          priorityQueue.offer(intermediateRecord);
+        }
+      }
+    }
+    return priorityQueue;
+  }
+
+   private List<Record> sortRecordsMap(Map<Key, Record> recordsMap) {
+    int numRecords = recordsMap.size();
+    List<Record> sortedRecords = new ArrayList<>(numRecords);
+    List<IntermediateRecord> intermediateRecords = new ArrayList<>(numRecords);
+    for (Record record : recordsMap.values()) {
+      intermediateRecords.add(getIntermediateRecord(record));
+    }
+    intermediateRecords.sort(_intermediateRecordComparator);
+    for (IntermediateRecord intermediateRecord : intermediateRecords) {
+      sortedRecords.add(recordsMap.get(intermediateRecord._key));
+    }
+    return sortedRecords;
+  }
+
+  /**
+   * Resizes the recordsMap and returns a sorted list of records.
+   * This method is to be called from IndexedTable::finish, if both resize and sort is needed
+   *
+   * If numRecordsToEvict > numRecordsToRetain, resize with PQ of records to evict, and then sort
+   * Else, resize with PQ of record to retain, then use the PQ to create sorted list
+   */
+  List<Record> resizeAndSortRecordsMap(Map<Key, Record> recordsMap, int trimToSize) {
+
+    int numRecords = recordsMap.size();
+    if (numRecords == 0) {
+      return Collections.emptyList();
+    }
+
+    int numRecordsToRetain = Math.min(numRecords, trimToSize);
+    int numRecordsToEvict = numRecords - numRecordsToRetain;
+
+    if (numRecordsToEvict < numRecordsToRetain) { // num records to evict is smaller than num records to retain
+      if (numRecordsToEvict > 0) {
+        // make PQ of records to evict
+        PriorityQueue<IntermediateRecord> priorityQueue =
+            convertToIntermediateRecordsPQ(recordsMap, numRecordsToEvict, _intermediateRecordComparator);
+        for (IntermediateRecord evictRecord : priorityQueue) {
+          recordsMap.remove(evictRecord._key);
+        }
+      }
+      return sortRecordsMap(recordsMap);
+    } else {
+      // make PQ of records to retain
+      PriorityQueue<IntermediateRecord> priorityQueue =
+          convertToIntermediateRecordsPQ(recordsMap, numRecordsToRetain, _intermediateRecordComparator.reversed());
+      // use PQ to get sorted list
+      Record[] sortedArray = new Record[numRecordsToRetain];
+      ObjectOpenHashSet<Key> keysToRetain = new ObjectOpenHashSet<>(numRecordsToRetain);
+      while (!priorityQueue.isEmpty()) {
+        IntermediateRecord intermediateRecord = priorityQueue.poll();
+        keysToRetain.add(intermediateRecord._key);
+        Record record = recordsMap.get(intermediateRecord._key);
+        sortedArray[--numRecordsToRetain] = record;
+      }
+      recordsMap.keySet().retainAll(keysToRetain);
+      return Arrays.asList(sortedArray);
+    }
+  }
+
+  /**
+   * Helper class to store a subset of Record fields
+   * IntermediateRecord is derived from a Record
+   * Some of the main properties of an IntermediateRecord are:
+   *
+   * 1. Key in IntermediateRecord is expected to be identical to the one in the Record
+   * 2. For values, IntermediateRecord should only have the columns needed for order by
+   * 3. Inside the values, the columns should be ordered by the order by sequence
+   * 4. For order by on aggregations, final results should extracted if the intermediate result is non-comparable
+   */
+  @VisibleForTesting
+  static class IntermediateRecord {
+    final Key _key;
+    final Comparable[] _values;
+
+    IntermediateRecord(Key key, Comparable[] values) {
+      _key = key;
+      _values = values;
+    }
+  }
+
+  /**
+   * Extractor for order by value columns from Record
+   */
+  private static abstract class OrderByValueExtractor {
+    abstract Comparable extract(Record record);
+  }
+
+  /**
+   * Extractor for key column
+   */
+  private static class KeyColumnExtractor extends OrderByValueExtractor {
+    final int _index;
+
+    KeyColumnExtractor(int index) {
+      _index = index;
+    }
+
+    @Override
+    Comparable extract(Record record) {
+      Object keyColumn = record.getKey().getColumns()[_index];
+      return (Comparable) keyColumn;
+    }
+  }
+
+  /**
+   * Extractor for aggregation column
+   */
+  private static class AggregationColumnExtractor extends OrderByValueExtractor {
+    final int _index;
+    final Function<Object, Comparable> _convertorFunction;
+
+    AggregationColumnExtractor(int index, AggregationFunction aggregationFunction) {
+      _index = index;
+      if (aggregationFunction.isIntermediateResultComparable()) {
+        _convertorFunction = o -> (Comparable) o;
+      } else {
+        _convertorFunction = o -> aggregationFunction.extractFinalResult(o);
+      }
+    }
+
+    @Override
+    Comparable extract(Record record) {
+      Object aggregationColumn = record.getValues()[_index];
+      return _convertorFunction.apply(aggregationColumn);
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
index a7f4894..0fa92df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/SimpleIndexedTable.java
@@ -19,19 +19,14 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.base.Preconditions;
-import java.util.ArrayList;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import javax.annotation.Nonnull;
 import javax.annotation.concurrent.NotThreadSafe;
-import org.apache.commons.collections.CollectionUtils;
 import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.order.OrderByUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -43,11 +38,7 @@ import org.slf4j.LoggerFactory;
 public class SimpleIndexedTable extends IndexedTable {
   private static final Logger LOGGER = LoggerFactory.getLogger(SimpleIndexedTable.class);
 
-  private List<Record> _records;
-  private Map<Key, Integer> _lookupTable;
-
-  private boolean _isOrderBy;
-  private Comparator<Record> _orderByComparator;
+  private Map<Key, Record> _lookupMap;
   private Iterator<Record> _iterator;
 
   private boolean _noMoreNewRecords = false;
@@ -55,59 +46,57 @@ public class SimpleIndexedTable extends IndexedTable {
   private long _resizeTime = 0;
 
   /**
-   * Initializes the data structures and comparators needed for this Table
+   * Initializes the data structures needed for this Table
    * @param dataSchema data schema of the record's keys and values
-   * @param aggregationInfos aggregation infors for the aggregations in record'd values
+   * @param aggregationInfos aggregation infos for the aggregations in record'd values
    * @param orderBy list of {@link SelectionSort} defining the order by
-   * @param capacity the max number of records to hold
+   * @param capacity the capacity of the table
    */
-  @Override
-  public void init(@Nonnull DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
+  public SimpleIndexedTable(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
       int capacity) {
-    super.init(dataSchema, aggregationInfos, orderBy, capacity);
-
-    _records = new ArrayList<>(capacity);
-    _lookupTable = new HashMap<>(capacity);
+    super(dataSchema, aggregationInfos, orderBy, capacity);
 
-    _isOrderBy = CollectionUtils.isNotEmpty(orderBy);
-    if (_isOrderBy) {
-      // final results not extracted upfront
-      _orderByComparator = OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
-    }
+    _lookupMap = new HashMap<>();
   }
 
   /**
    * Non thread safe implementation of upsert to insert {@link Record} into the {@link Table}
    */
   @Override
-  public boolean upsert(@Nonnull Record newRecord) {
-    Key keys = newRecord.getKey();
-    Preconditions.checkNotNull(keys, "Cannot upsert record with null keys");
-
-    Integer index = _lookupTable.get(keys);
-    if (_noMoreNewRecords) { // only update existing records
-      if (index != null) {
-        Record existingRecord = _records.get(index);
+  public boolean upsert(Record newRecord) {
+    Key key = newRecord.getKey();
+    Preconditions.checkNotNull(key, "Cannot upsert record with null keys");
+
+    if (_noMoreNewRecords) { // allow only existing record updates
+      _lookupMap.computeIfPresent(key, (k, v) -> {
+        Object[] existingValues = v.getValues();
+        Object[] newValues = newRecord.getValues();
         for (int i = 0; i < _numAggregations; i++) {
-          existingRecord.getValues()[i] = _aggregationFunctions.get(i).merge(existingRecord.getValues()[i], newRecord.getValues()[i]);
+          existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
         }
-      }
+        return v;
+      });
     } else { // allow all records
-      if (index == null) {
-        index = size();
-        _lookupTable.put(keys, index);
-        _records.add(index, newRecord);
-      } else {
-        Record existingRecord = _records.get(index);
-        for (int i = 0; i < _numAggregations; i++) {
-          existingRecord.getValues()[i] = _aggregationFunctions.get(i).merge(existingRecord.getValues()[i], newRecord.getValues()[i]);
-        }
-      }
 
-      if (size() >= _bufferedCapacity) {
-        if (_isOrderBy) { // capacity reached, order and resize
-          sortAndResize(_maxCapacity);
-        } else { // capacity reached, but no order by. Allow no more records
+      _lookupMap.compute(key, (k, v) -> {
+        if (v == null) {
+          return newRecord;
+        } else {
+          Object[] existingValues = v.getValues();
+          Object[] newValues = newRecord.getValues();
+          for (int i = 0; i < _numAggregations; i++) {
+            existingValues[i] = _aggregationFunctions[i].merge(existingValues[i], newValues[i]);
+          }
+          return v;
+        }
+      });
+
+      if (_lookupMap.size() >= _maxCapacity) {
+        if (_isOrderBy) {
+          // reached max capacity, resize
+          resize(_capacity);
+        } else {
+          // reached max capacity and no order by. No more new records will be accepted
           _noMoreNewRecords = true;
         }
       }
@@ -115,24 +104,11 @@ public class SimpleIndexedTable extends IndexedTable {
     return true;
   }
 
-  private void sortAndResize(int trimToSize) {
-    long startTime = System.currentTimeMillis();
-
-    // sort
-    if (_isOrderBy) {
-      _records.sort(_orderByComparator);
-    }
+  private void resize(int trimToSize) {
 
-    // evict lowest (or whatever's at the bottom if sort didnt happen)
-    if (_records.size() > trimToSize) {
-      _records = new ArrayList<>(_records.subList(0, trimToSize));
-    }
+    long startTime = System.currentTimeMillis();
 
-    // rebuild lookup table
-    _lookupTable.clear();
-    for (int i = 0; i < _records.size(); i++) {
-      _lookupTable.put(_records.get(i).getKey(), i);
-    }
+    _indexedTableResizer.resizeRecordsMap(_lookupMap, trimToSize);
 
     long endTime = System.currentTimeMillis();
     long timeElapsed = endTime - startTime;
@@ -141,19 +117,24 @@ public class SimpleIndexedTable extends IndexedTable {
     _resizeTime += timeElapsed;
   }
 
+  private List<Record> resizeAndSort(int trimToSize) {
 
-  @Override
-  public boolean merge(@Nonnull Table table) {
-    Iterator<Record> iterator = table.iterator();
-    while (iterator.hasNext()) {
-      upsert(iterator.next());
-    }
-    return true;
+    long startTime = System.currentTimeMillis();
+
+    List<Record> sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(_lookupMap, trimToSize);
+
+    long endTime = System.currentTimeMillis();
+    long timeElapsed = endTime - startTime;
+
+    _numResizes++;
+    _resizeTime += timeElapsed;
+
+    return sortedRecords;
   }
 
   @Override
   public int size() {
-    return _records.size();
+    return _lookupMap.size();
   }
 
   @Override
@@ -163,16 +144,22 @@ public class SimpleIndexedTable extends IndexedTable {
 
   @Override
   public void finish(boolean sort) {
-    // TODO: support resize without sort
-    sortAndResize(_maxCapacity);
-    LOGGER.debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", _numResizes, _resizeTime,
-        _numResizes == 0 ? 0 : _resizeTime / _numResizes);
 
-    _iterator = _records.iterator();
-  }
+    if (_isOrderBy) {
 
-  @Override
-  public DataSchema getDataSchema() {
-    return _dataSchema;
+      if (sort) {
+        List<Record> sortedRecords = resizeAndSort(_capacity);
+        _iterator = sortedRecords.iterator();
+      } else {
+        resize(_capacity);
+      }
+      LOGGER
+          .debug("Num resizes : {}, Total time spent in resizing : {}, Avg resize time : {}", _numResizes, _resizeTime,
+              _numResizes == 0 ? 0 : _resizeTime / _numResizes);
+    }
+
+    if (_iterator == null) {
+      _iterator = _lookupMap.values().iterator();
+    }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
index e515bdb..db7b7f9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
@@ -19,10 +19,6 @@
 package org.apache.pinot.core.data.table;
 
 import java.util.Iterator;
-import java.util.List;
-import javax.annotation.Nonnull;
-import org.apache.pinot.common.request.AggregationInfo;
-import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 
 
@@ -32,24 +28,14 @@ import org.apache.pinot.common.utils.DataSchema;
 public interface Table {
 
   /**
-   * Initializes the Table for use
-   * @param dataSchema the schema of the columns in the {@link Record}
-   * @param aggregationInfos the aggregation info for the values if applicable
-   * @param orderBy the order by information if applicable
-   * @param maxCapacity the max capacity the table should have
-   */
-  void init(DataSchema dataSchema, List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy,
-      int maxCapacity);
-
-  /**
    * Update the table with the given record
    */
-  boolean upsert(@Nonnull Record record);
+  boolean upsert(Record record);
 
   /**
    * Merge all records from given table
    */
-  boolean merge(@Nonnull Table table);
+  boolean merge(Table table);
 
   /**
    * Returns the size of the table
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
index 43f5076..d15f23a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/CombineGroupByOrderByOperator.java
@@ -77,10 +77,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
     _executorService = executorService;
     _timeOutMs = timeOutMs;
     _initLock = new ReentrantLock();
-    _indexedTable = new ConcurrentIndexedTable();
-    _indexedTableCapacity = 1_000_000;
-    // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this value to a higher number until we can tune the resize
-    //_indexedTableCapacity = GroupByUtils.getTableCapacity((int) brokerRequest.getGroupBy().getTopN());
+    _indexedTableCapacity = GroupByUtils.getTableCapacity(brokerRequest.getGroupBy(), brokerRequest.getOrderBy());
   }
 
   /**
@@ -123,8 +120,8 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
             try {
               if (_dataSchema == null) {
                 _dataSchema = intermediateResultsBlock.getDataSchema();
-                _indexedTable.init(_dataSchema, _brokerRequest.getAggregationsInfo(), _brokerRequest.getOrderBy(),
-                    _indexedTableCapacity);
+                _indexedTable = new ConcurrentIndexedTable(_dataSchema, _brokerRequest.getAggregationsInfo(),
+                    _brokerRequest.getOrderBy(), _indexedTableCapacity);
               }
             } finally {
               _initLock.unlock();
@@ -240,7 +237,7 @@ public class CombineGroupByOrderByOperator extends BaseOperator<IntermediateResu
         function = Double::valueOf;
         break;
       case BYTES:
-        function = BytesUtils::toBytes;
+        function = BytesUtils::toByteArray;
         break;
       case STRING:
       default:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
index b982006..0a0a532 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/IntermediateResultsBlock.java
@@ -279,7 +279,8 @@ public class IntermediateResultsBlock implements Block {
     return attachMetadataToDataTable(dataTable);
   }
 
-  private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder dataTableBuilder, int columnIndex, Object value)
+  private void setDataTableColumn(ColumnDataType columnDataType, DataTableBuilder dataTableBuilder, int columnIndex,
+      Object value)
       throws IOException {
     switch (columnDataType) {
 
@@ -298,6 +299,10 @@ public class IntermediateResultsBlock implements Block {
       case STRING:
         dataTableBuilder.setColumn(columnIndex, (String) value);
         break;
+      case BYTES:
+        // FIXME: support BYTES in DataTable instead of converting to string
+        dataTableBuilder.setColumn(columnIndex, value.toString()); // ByteArray::toString
+        break;
       default:
         dataTableBuilder.setColumn(columnIndex, value);
         break;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
index 6296318..0448ef9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/AggregationGroupByOrderByOperator.java
@@ -63,6 +63,7 @@ public class AggregationGroupByOrderByOperator extends BaseOperator<Intermediate
     _numTotalRawDocs = numTotalRawDocs;
     _useStarTree = useStarTree;
 
+    // NOTE: The indexedTable expects that the the data schema will have group by columns before aggregation columns
     int numColumns = groupBy.getExpressionsSize() + _functionContexts.length;
     String[] columnNames = new String[numColumns];
     DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numColumns];
@@ -85,6 +86,9 @@ public class AggregationGroupByOrderByOperator extends BaseOperator<Intermediate
       index++;
     }
 
+    // TODO: We need to support putting order by columns in the data schema.
+    //  It is possible that the order by column is not one of the group by or aggregation columns
+
     _dataSchema = new DataSchema(columnNames, columnDataTypes);
   }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
index 746df74..a16e344 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/groupby/AggregationGroupByTrimmingService.java
@@ -36,6 +36,7 @@ import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.aggregation.function.MinAggregationFunction;
+import org.apache.pinot.core.util.GroupByUtils;
 
 
 /**
@@ -55,7 +56,7 @@ public class AggregationGroupByTrimmingService {
     _groupByTopN = groupByTopN;
 
     // To keep the precision, _trimSize is the larger of (_groupByTopN * 5) or 5000
-    _trimSize = Math.max(_groupByTopN * 5, 5000);
+    _trimSize = GroupByUtils.getTableCapacity(_groupByTopN);
 
     // To trigger the trimming, number of groups should be larger than _trimThreshold which is (_trimSize * 4)
     _trimThreshold = _trimSize * 4;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index a205a5d..9c9c91e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -53,6 +53,7 @@ import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
 import org.apache.pinot.common.response.broker.ResultTable;
 import org.apache.pinot.common.response.broker.SelectionResults;
+import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.common.utils.CommonConstants.Broker.Request;
 import org.apache.pinot.common.utils.CommonConstants.Broker.Request.QueryOptionKey;
 import org.apache.pinot.common.utils.DataSchema;
@@ -517,11 +518,8 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
       List<AggregationInfo> aggregationInfos, List<SelectionSort> orderBy, DataSchema dataSchema,
       Map<ServerInstance, DataTable> dataTableMap) {
 
-    IndexedTable indexedTable = new ConcurrentIndexedTable();
-    int indexedTableCapacity = 1_000_000;
-    // FIXME: indexedTableCapacity should be derived from TOP. Hardcoding this value to a higher number until we can tune the resize
-    // int capacity = GroupByUtils.getTableCapacity((int) groupBy.getTopN());
-    indexedTable.init(dataSchema, aggregationInfos, orderBy, indexedTableCapacity);
+    int indexedTableCapacity = GroupByUtils.getTableCapacity(groupBy, orderBy);
+    IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, indexedTableCapacity);
 
     for (DataTable dataTable : dataTableMap.values()) {
       BiFunction[] functions = new BiFunction[dataSchema.size()];
@@ -545,6 +543,10 @@ public class BrokerReduceService implements ReduceService<BrokerResponseNative>
           case STRING:
             function = dataTable::getString;
             break;
+          case BYTES:
+            // FIXME: support BYTES in DataTable instead of converting to string
+            function = (row, col) -> BytesUtils.toByteArray(dataTable.getString(row, col));
+            break;
           default:
             function = dataTable::getObject;
         }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
index 6fbb03a..f858ae4 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GroupByUtils.java
@@ -18,26 +18,43 @@
  */
 package org.apache.pinot.core.util;
 
+import java.util.List;
 import java.util.Map;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.GroupBy;
+import org.apache.pinot.common.request.SelectionSort;
 
 import static org.apache.pinot.common.utils.CommonConstants.Broker.Request.*;
 
 
 public final class GroupByUtils {
 
-  public static final int NUM_RESULTS_LOWER_LIMIT = 5000;
+  private static final int NUM_RESULTS_LOWER_LIMIT = 5000;
 
   private GroupByUtils() {
 
   }
 
   /**
-   * Returns the higher of topN * 5 or 5k. This is to ensure we better precision in results
+   * Returns the higher of topN * 5 or 5k. This is to ensure better precision in results
    */
   public static int getTableCapacity(int topN) {
     return Math.max(topN * 5, NUM_RESULTS_LOWER_LIMIT);
   }
 
+  /**
+   * For group by + order by queries: returns the higher of (topN * 5) or (5k), to ensure better precision in results
+   * For group by with no order by queries: returns the topN
+   */
+  public static int getTableCapacity(GroupBy groupBy, List<SelectionSort> orderBy) {
+    int topN = (int) groupBy.getTopN();
+    if (orderBy != null && !orderBy.isEmpty()) {
+      return getTableCapacity(topN);
+    } else {
+      return topN;
+    }
+  }
+
   public static boolean isGroupByMode(String groupByMode, Map<String, String> queryOptions) {
     if (queryOptions != null) {
       String groupByModeValue = queryOptions.get(QueryOptionKey.GROUP_BY_MODE);
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java
deleted file mode 100644
index 3fd774a..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/order/OrderByUtilsTest.java
+++ /dev/null
@@ -1,265 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.data.order;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-import org.apache.pinot.common.request.AggregationInfo;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.data.table.Record;
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-
-public class OrderByUtilsTest {
-
-  @Test
-  public void testComparators() {
-    DataSchema dataSchema = new DataSchema(new String[]{"dim0", "dim1", "dim2", "dim3", "metric0"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING,
-            DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.DOUBLE});
-    List<Record> records = new ArrayList<>();
-    records.add(getRecord(new Object[]{"abc", "p125", 10, "30"}, new Object[]{10d}));
-    records.add(getRecord(new Object[]{"abc", "p125", 50, "30"}, new Object[]{200d}));
-    records.add(getRecord(new Object[]{"abc", "r666", 6, "200"}, new Object[]{200d}));
-    records.add(getRecord(new Object[]{"mno", "h776", 10, "100"}, new Object[]{100d}));
-    records.add(getRecord(new Object[]{"ghi", "i889", 66, "5"}, new Object[]{50d}));
-    records.add(getRecord(new Object[]{"mno", "p125", 10, "30"}, new Object[]{250d}));
-    records.add(getRecord(new Object[]{"bcd", "i889", 6, "209"}, new Object[]{100d}));
-
-    List<SelectionSort> orderBy;
-    SelectionSort s0;
-    SelectionSort s1;
-    SelectionSort s2;
-    SelectionSort s3;
-    SelectionSort s4;
-    List<Object> expected0;
-    List<Object> expected1;
-    List<Object> expected2;
-    List<Object> expected3;
-    List<Object> expected4;
-    List<Object> actual0;
-    List<Object> actual1;
-    List<Object> actual2;
-    List<Object> actual3;
-    List<Object> actual4;
-
-    // string column
-    s0 = new SelectionSort();
-    s0.setColumn("dim0");
-    s0.setIsAsc(true);
-    orderBy = Lists.newArrayList(s0);
-    Comparator<Record> keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected0 = Lists.newArrayList("abc", "abc", "abc", "bcd", "ghi", "mno", "mno");
-    actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
-    Assert.assertEquals(actual0, expected0);
-
-    // string column desc
-    s0 = new SelectionSort();
-    s0.setColumn("dim0");
-    orderBy = Lists.newArrayList(s0);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected0 = Lists.newArrayList("mno", "mno", "ghi", "bcd", "abc", "abc", "abc");
-    actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
-    Assert.assertEquals(actual0, expected0);
-
-    // numeric dimension
-    s2 = new SelectionSort();
-    s2.setColumn("dim2");
-    s2.setIsAsc(true);
-    orderBy = Lists.newArrayList(s2);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected2 = Lists.newArrayList(6, 6, 10, 10, 10, 50, 66);
-    actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
-    Assert.assertEquals(actual2, expected2);
-
-    // desc
-    s2 = new SelectionSort();
-    s2.setColumn("dim2");
-    s2.setIsAsc(false);
-    orderBy = Lists.newArrayList(s2);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected2 = Lists.newArrayList(66, 50, 10, 10, 10, 6, 6);
-    actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
-    Assert.assertEquals(actual2, expected2);
-
-    // string numeric dimension
-    s3 = new SelectionSort();
-    s3.setColumn("dim3");
-    s3.setIsAsc(true);
-    orderBy = Lists.newArrayList(s3);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected3 = Lists.newArrayList("100", "200", "209", "30", "30", "30", "5");
-    actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
-    Assert.assertEquals(actual3, expected3);
-
-    // desc
-    s3 = new SelectionSort();
-    s3.setColumn("dim3");
-    orderBy = Lists.newArrayList(s3);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected3 = Lists.newArrayList("5", "30", "30", "30", "209", "200", "100");
-    actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
-    Assert.assertEquals(actual3, expected3);
-
-    // multiple dimensions
-    s0 = new SelectionSort();
-    s0.setColumn("dim0");
-    s1 = new SelectionSort();
-    s1.setColumn("dim1");
-    s1.setIsAsc(true);
-    orderBy = Lists.newArrayList(s0, s1);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected0 = Lists.newArrayList("mno", "mno", "ghi", "bcd", "abc", "abc", "abc");
-    actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
-    Assert.assertEquals(actual0, expected0);
-    expected1 = Lists.newArrayList("h776", "p125", "i889", "i889", "p125", "p125", "r666");
-    actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
-    Assert.assertEquals(actual1, expected1);
-
-    s2 = new SelectionSort();
-    s2.setColumn("dim2");
-    s1 = new SelectionSort();
-    s1.setColumn("dim1");
-    orderBy = Lists.newArrayList(s2, s1);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected2 = Lists.newArrayList(66, 50, 10, 10, 10, 6, 6);
-    actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
-    Assert.assertEquals(actual2, expected2);
-    expected1 = Lists.newArrayList("i889", "p125", "p125", "p125", "h776", "r666", "i889");
-    actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
-    Assert.assertEquals(actual1, expected1);
-
-    s2 = new SelectionSort();
-    s2.setColumn("dim2");
-    s1 = new SelectionSort();
-    s1.setColumn("dim1");
-    s3 = new SelectionSort();
-    s3.setColumn("dim3");
-    s3.setIsAsc(true);
-    orderBy = Lists.newArrayList(s2, s1, s3);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected2 = Lists.newArrayList(66, 50, 10, 10, 10, 6, 6);
-    actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
-    Assert.assertEquals(actual2, expected2);
-    expected1 = Lists.newArrayList("i889", "p125", "p125", "p125", "h776", "r666", "i889");
-    actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
-    Assert.assertEquals(actual1, expected1);
-    expected3 = Lists.newArrayList("5", "30", "30", "30", "100", "200", "209");
-    actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
-    Assert.assertEquals(actual3, expected3);
-
-    // all columns
-    s0 = new SelectionSort();
-    s0.setColumn("dim0");
-    s0.setIsAsc(true);
-    s2 = new SelectionSort();
-    s2.setColumn("dim2");
-    s1 = new SelectionSort();
-    s1.setColumn("dim1");
-    s3 = new SelectionSort();
-    s3.setColumn("dim3");
-    s3.setIsAsc(true);
-    orderBy = Lists.newArrayList(s0, s2, s1, s3);
-    keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-    records.sort(keysComparator);
-    expected0 = Lists.newArrayList( "abc", "abc", "abc", "bcd", "ghi", "mno", "mno");
-    actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
-    Assert.assertEquals(actual0, expected0);
-    expected2 = Lists.newArrayList(50, 10, 6, 6, 66, 10, 10);
-    actual2 = records.stream().map(k -> k.getKey().getColumns()[2]).collect(Collectors.toList());
-    Assert.assertEquals(actual2, expected2);
-    expected1 = Lists.newArrayList("p125", "p125", "r666", "i889", "i889", "p125", "h776");
-    actual1 = records.stream().map(k -> k.getKey().getColumns()[1]).collect(Collectors.toList());
-    Assert.assertEquals(actual1, expected1);
-    expected3 = Lists.newArrayList("30", "30", "200", "209", "5", "30", "100");
-    actual3 = records.stream().map(k -> k.getKey().getColumns()[3]).collect(Collectors.toList());
-    Assert.assertEquals(actual3, expected3);
-
-    // non existent column
-    s0 = new SelectionSort();
-    s0.setColumn("dim10");
-    s0.setIsAsc(true);
-    orderBy = Lists.newArrayList(s0);
-    boolean exception = false;
-    try {
-      keysComparator = OrderByUtils.getKeysComparator(dataSchema, orderBy);
-      records.sort(keysComparator);
-    } catch (UnsupportedOperationException e) {
-      exception = true;
-    }
-    Assert.assertTrue(exception);
-
-    // keys and values
-    Map<String, String> aggregationParams = new HashMap<>();
-    aggregationParams.put("column", "metric0");
-    AggregationInfo aggregationInfo = new AggregationInfo();
-    aggregationInfo.setAggregationType("SUM");
-    aggregationInfo.setAggregationParams(aggregationParams);
-    List<AggregationInfo> aggregationInfos = Lists.newArrayList(aggregationInfo);
-    s0 = new SelectionSort();
-    s0.setColumn("dim0");
-    s0.setIsAsc(true);
-    s4 = new SelectionSort();
-    s4.setColumn("sum(metric0)");
-    orderBy = Lists.newArrayList(s0, s4);
-    Comparator<Record> keysAndValuesComparator =
-        OrderByUtils.getKeysAndValuesComparator(dataSchema, orderBy, aggregationInfos, true);
-    records.sort(keysAndValuesComparator);
-    expected0 = Lists.newArrayList("abc", "abc", "abc", "bcd", "ghi", "mno", "mno");
-    actual0 = records.stream().map(k -> k.getKey().getColumns()[0]).collect(Collectors.toList());
-    Assert.assertEquals(actual0, expected0);
-    expected4 = Lists.newArrayList(200d, 200d, 10d, 100d, 50d, 250d, 100d);
-    actual4 = records.stream().map(k -> k.getValues()[0]).collect(Collectors.toList());
-    Assert.assertEquals(actual4, expected4);
-
-    // values only
-    dataSchema = new DataSchema(new String[]{"metric0"},
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE});
-    s4 = new SelectionSort();
-    s4.setColumn("metric0");
-    s4.setIsAsc(true);
-    orderBy = Lists.newArrayList(s4);
-    Comparator<Record> valuesComparator = OrderByUtils.getValuesComparator(dataSchema, orderBy);
-    records.sort(valuesComparator);
-    expected4 = Lists.newArrayList(10d, 50d, 100d, 100d, 200d, 200d, 250d);
-    actual4 = records.stream().map(k -> k.getValues()[0]).collect(Collectors.toList());
-    Assert.assertEquals(actual4, expected4);
-  }
-
-  private Record getRecord(Object[] keys, Object[] values) {
-    return new Record(new Key(keys), values);
-  }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableResizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableResizerTest.java
new file mode 100644
index 0000000..4e21579
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableResizerTest.java
@@ -0,0 +1,434 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.table;
+
+import com.google.common.collect.Lists;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests the functionality of {@link @IndexedTableResizer}
+ */
+public class IndexedTableResizerTest {
+
+  private DataSchema dataSchema;
+  private List<AggregationInfo> aggregationInfos;
+  private List<SelectionSort> selectionSort;
+  private SelectionSort sel1;
+  private SelectionSort sel2;
+  private SelectionSort sel3;
+  private IndexedTableResizer _indexedTableResizer;
+
+  private List<Record> records;
+  private int trimToSize = 3;
+  private Map<Key, Record> recordsMap;
+
+  @BeforeClass
+  public void beforeClass() {
+    dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)", "distinctcount(m3)", "avg(m4)"},
+        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.OBJECT, DataSchema.ColumnDataType.OBJECT});
+    AggregationInfo agg1 = new AggregationInfo();
+    Map<String, String> params1 = new HashMap<>(1);
+    params1.put("column", "m1");
+    agg1.setAggregationParams(params1);
+    agg1.setAggregationType("sum");
+    AggregationInfo agg2 = new AggregationInfo();
+    Map<String, String> params2 = new HashMap<>(1);
+    params2.put("column", "m2");
+    agg2.setAggregationParams(params2);
+    agg2.setAggregationType("max");
+    AggregationInfo agg3 = new AggregationInfo();
+    Map<String, String> params3 = new HashMap<>(1);
+    params3.put("column", "m3");
+    agg3.setAggregationParams(params3);
+    agg3.setAggregationType("distinctcount");
+    AggregationInfo agg4 = new AggregationInfo();
+    Map<String, String> params4 = new HashMap<>(1);
+    params4.put("column", "m4");
+    agg4.setAggregationParams(params4);
+    agg4.setAggregationType("avg");
+    aggregationInfos = Lists.newArrayList(agg1, agg2, agg3, agg4);
+
+    sel1 = new SelectionSort();
+    sel2 = new SelectionSort();
+    sel3 = new SelectionSort();
+
+    IntOpenHashSet i1 = new IntOpenHashSet();
+    i1.add(1);
+    IntOpenHashSet i2 = new IntOpenHashSet();
+    i2.add(1);
+    i2.add(2);
+    IntOpenHashSet i3 = new IntOpenHashSet();
+    i3.add(1);
+    i3.add(2);
+    IntOpenHashSet i4 = new IntOpenHashSet();
+    i4.add(1);
+    i4.add(2);
+    i4.add(3);
+    IntOpenHashSet i5 = new IntOpenHashSet();
+    i5.add(1);
+    i5.add(2);
+    i5.add(3);
+    i5.add(4);
+    records = Lists.newArrayList(
+        new Record(new Key(new Object[]{"a", 10, 1.0}), new Object[]{10, 100, i1, new AvgPair(10, 2) /* 5 */}),
+        new Record(new Key(new Object[]{"b", 10, 2.0}), new Object[]{20, 200, i2, new AvgPair(10, 3) /* 3.33 */}),
+        new Record(new Key(new Object[]{"c", 200, 3.0}), new Object[]{30, 300, i3, new AvgPair(20, 4) /* 5 */}),
+        new Record(new Key(new Object[]{"c", 50, 4.0}), new Object[]{30, 200, i4, new AvgPair(30, 10) /* 3 */}),
+        new Record(new Key(new Object[]{"c", 300, 5.0}), new Object[]{20, 100, i5, new AvgPair(10, 5) /* 2 */}));
+    recordsMap = new HashMap<>();
+  }
+
+  /**
+   * {@link IndexedTableResizer} resizes the records map based on SelectionSort
+   */
+  @Test
+  public void testResizeRecordsMap() {
+
+    // Test resize algorithm with numRecordsToEvict < trimToSize.
+    // TotalRecords=5; trimToSize=3; numRecordsToEvict=2
+
+    // d1 asc
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // a, b, c
+    Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+    // d1 desc
+    sel1.setColumn("d1");
+    sel1.setIsAsc(false);
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(2).getKey())); // c, c, c
+    Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+    Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey()));
+
+    // d1 asc, d3 desc (tie breaking with 2nd comparator
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    sel2.setColumn("d3");
+    sel2.setIsAsc(false);
+    selectionSort = Lists.newArrayList(sel1, sel2);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // 10, 10, 300
+    Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+    Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey()));
+
+    // d2 asc
+    sel1.setColumn("d2");
+    sel1.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // 10, 10, 50
+    Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+    Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+
+    // d1 asc, sum(m1) desc, max(m2) desc
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    sel2.setColumn("sum(m1)");
+    sel2.setIsAsc(false);
+    sel3.setColumn("max(m2)");
+    sel3.setIsAsc(false);
+    selectionSort = Lists.newArrayList(sel1, sel2, sel3);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // a, b, (c (30, 300))
+    Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+    Assert.assertTrue(recordsMap.containsKey(records.get(2).getKey()));
+
+    // object type avg(m4) asc
+    sel1.setColumn("avg(m4)");
+    sel1.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 2, 3, 3.33,
+    Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+    Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+    // non-comparable intermediate result
+    sel1.setColumn("distinctcount(m3)");
+    sel1.setIsAsc(false);
+    sel2.setColumn("d1");
+    sel2.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1, sel2);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 6, 5, 4 (b)
+    Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+    Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+
+    // Test resize algorithm with numRecordsToEvict > trimToSize.
+    // TotalRecords=5; trimToSize=2; numRecordsToEvict=3
+    trimToSize = 2;
+
+    // d1 asc
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(0).getKey())); // a, b
+    Assert.assertTrue(recordsMap.containsKey(records.get(1).getKey()));
+
+    // object type avg(m4) asc
+    sel1.setColumn("avg(m4)");
+    sel1.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 2, 3, 3.33,
+    Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+
+    // non-comparable intermediate result
+    sel1.setColumn("distinctcount(m3)");
+    sel1.setIsAsc(false);
+    sel2.setColumn("d1");
+    sel2.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1, sel2);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    _indexedTableResizer.resizeRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(recordsMap.size(), trimToSize);
+    Assert.assertTrue(recordsMap.containsKey(records.get(4).getKey())); // 6, 5, 4 (b)
+    Assert.assertTrue(recordsMap.containsKey(records.get(3).getKey()));
+
+    // Reset trimToSize
+    trimToSize = 3;
+  }
+
+  /**
+   * Tests the sort function for ordered resizer
+   */
+  @Test
+  public void testResizeAndSortRecordsMap() {
+    List<Record> sortedRecords;
+    int[] order;
+
+    // d1 asc
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(sortedRecords.size(), trimToSize);
+    order = new int[]{0, 1};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+
+    // d1 asc - trim to 1
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+    Assert.assertEquals(sortedRecords.size(), 1);
+    order = new int[]{0};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+
+    // d1 asc, d3 desc (tie breaking with 2nd comparator)
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    sel2.setColumn("d3");
+    sel2.setIsAsc(false);
+    selectionSort = Lists.newArrayList(sel1, sel2);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(sortedRecords.size(), trimToSize);
+    order = new int[]{0, 1, 4};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+
+    // d1 asc, d3 desc (tie breaking with 2nd comparator) - trim 1
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+    Assert.assertEquals(sortedRecords.size(), 1);
+    order = new int[]{0};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+
+    // d1 asc, sum(m1) desc, max(m2) desc
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    sel2.setColumn("sum(m1)");
+    sel2.setIsAsc(false);
+    sel3.setColumn("max(m2)");
+    sel3.setIsAsc(false);
+    selectionSort = Lists.newArrayList(sel1, sel2, sel3);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, trimToSize);
+    Assert.assertEquals(sortedRecords.size(), trimToSize);
+    order = new int[]{0, 1, 2};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+
+    // trim 1
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 1);
+    Assert.assertEquals(sortedRecords.size(), 1);
+    order = new int[]{0};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+
+    // object type avg(m4) asc
+    sel1.setColumn("avg(m4)");
+    sel1.setIsAsc(true);
+    sel2.setColumn("d1");
+    sel2.setIsAsc(true);
+    selectionSort = Lists.newArrayList(sel1, sel2);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, 10); // high trim to size
+    Assert.assertEquals(sortedRecords.size(), recordsMap.size());
+    order = new int[]{4, 3, 1, 0, 2};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+
+    // non-comparable intermediate result
+    sel1.setColumn("distinctcount(m3)");
+    sel1.setIsAsc(false);
+    sel2.setColumn("avg(m4)");
+    sel2.setIsAsc(false);
+    selectionSort = Lists.newArrayList(sel1, sel2);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    records.forEach(k -> recordsMap.put(k.getKey(), k));
+    sortedRecords = _indexedTableResizer.resizeAndSortRecordsMap(recordsMap, recordsMap.size()); // equal trim to size
+    Assert.assertEquals(sortedRecords.size(), recordsMap.size());
+    order = new int[]{4, 3, 2, 1, 0};
+    for (int i = 0; i < order.length; i++) {
+      Assert.assertEquals(sortedRecords.get(i), records.get(order[i]));
+    }
+  }
+
+  /**
+   * Tests the conversion of {@link Record} to {@link IndexedTableResizer.IntermediateRecord}
+   */
+  @Test
+  public void testIntermediateRecord() {
+
+    // d2
+    sel1.setColumn("d2");
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    for (Record record : records) {
+      IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+      Assert.assertEquals(intermediateRecord._key, record.getKey());
+      Assert.assertEquals(intermediateRecord._values.length, 1);
+      Assert.assertEquals(intermediateRecord._values[0], record.getKey().getColumns()[1]);
+    }
+
+    // sum(m1)
+    sel1.setColumn("sum(m1)");
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    for (Record record : records) {
+      IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+      Assert.assertEquals(intermediateRecord._key, record.getKey());
+      Assert.assertEquals(intermediateRecord._values.length, 1);
+      Assert.assertEquals(intermediateRecord._values[0], record.getValues()[0]);
+    }
+
+    // d1, max(m2)
+    sel1.setColumn("d1");
+    sel2.setColumn("max(m2)");
+    selectionSort = Lists.newArrayList(sel1, sel2);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    for (Record record : records) {
+      IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+      Assert.assertEquals(intermediateRecord._key, record.getKey());
+      Assert.assertEquals(intermediateRecord._values.length, 2);
+      Assert.assertEquals(intermediateRecord._values[0], record.getKey().getColumns()[0]);
+      Assert.assertEquals(intermediateRecord._values[1], record.getValues()[1]);
+    }
+
+    // d2, sum(m1), d3
+    sel1.setColumn("d2");
+    sel2.setColumn("sum(m1)");
+    sel3.setColumn("d3");
+    selectionSort = Lists.newArrayList(sel1, sel2, sel3);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    for (Record record : records) {
+      IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+      Assert.assertEquals(intermediateRecord._key, record.getKey());
+      Assert.assertEquals(intermediateRecord._values.length, 3);
+      Assert.assertEquals(intermediateRecord._values[0], record.getKey().getColumns()[1]);
+      Assert.assertEquals(intermediateRecord._values[1], record.getValues()[0]);
+      Assert.assertEquals(intermediateRecord._values[2], record.getKey().getColumns()[2]);
+    }
+
+    // non-comparable intermediate result
+    sel1.setColumn("distinctcount(m3)");
+    selectionSort = Lists.newArrayList(sel1);
+    _indexedTableResizer = new IndexedTableResizer(dataSchema, aggregationInfos, selectionSort);
+    AggregationFunction distinctCountFunction =
+        AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(2)).getAggregationFunction();
+    for (Record record : records) {
+      IndexedTableResizer.IntermediateRecord intermediateRecord = _indexedTableResizer.getIntermediateRecord(record);
+      Assert.assertEquals(intermediateRecord._key, record.getKey());
+      Assert.assertEquals(intermediateRecord._values.length, 1);
+      Assert.assertEquals(intermediateRecord._values[0], distinctCountFunction.extractFinalResult(record.getValues()[2]));
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
index 5a81445..3a1d0e2 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/IndexedTableTest.java
@@ -38,6 +38,7 @@ import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.testng.Assert;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 
@@ -48,7 +49,6 @@ public class IndexedTableTest {
 
   @Test
   public void testConcurrentIndexedTable() throws InterruptedException, TimeoutException, ExecutionException {
-    Table indexedTable = new ConcurrentIndexedTable();
 
     DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"},
         new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
@@ -71,8 +71,7 @@ public class IndexedTableTest {
     sel.setIsAsc(true);
     List<SelectionSort> orderBy = Lists.newArrayList(sel);
 
-    // max capacity 5, buffered capacity at 10
-    indexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
+    IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, 5);
 
     // 3 threads upsert together
     // a inserted 6 times (60), b inserted 5 times (50), d inserted 2 times (20)
@@ -132,13 +131,11 @@ public class IndexedTableTest {
     }
   }
 
+  @Test(dataProvider = "initDataProvider")
+  public void testNonConcurrentIndexedTable(List<SelectionSort> orderBy, List<String> survivors) {
 
-  @Test
-  public void testNonConcurrentIndexedTable() {
-
-    DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "sum(m1)", "max(m2)"},
-        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE,
-            ColumnDataType.DOUBLE});
+    DataSchema dataSchema = new DataSchema(new String[]{"d1", "d2", "d3", "d4", "sum(m1)", "max(m2)"},
+        new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.INT, ColumnDataType.DOUBLE, ColumnDataType.DOUBLE});
 
     AggregationInfo agg1 = new AggregationInfo();
     Map<String, String> params1 = new HashMap<>();
@@ -152,100 +149,135 @@ public class IndexedTableTest {
     agg2.setAggregationType("max");
     List<AggregationInfo> aggregationInfos = Lists.newArrayList(agg1, agg2);
 
-    SelectionSort sel = new SelectionSort();
-    sel.setColumn("sum(m1)");
-    sel.setIsAsc(true);
-    List<SelectionSort> orderBy = Lists.newArrayList(sel);
-
-    IndexedTable simpleIndexedTable = new SimpleIndexedTable();
-    // max capacity 5, buffered capacity 10
-    simpleIndexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
+    // Test SimpleIndexedTable
+    IndexedTable simpleIndexedTable = new SimpleIndexedTable(dataSchema, aggregationInfos, orderBy, 5);
     // merge table
-    IndexedTable mergeTable = new SimpleIndexedTable();
-    mergeTable.init(dataSchema, aggregationInfos, orderBy, 10);
+    IndexedTable mergeTable = new SimpleIndexedTable(dataSchema, aggregationInfos, orderBy, 10);
     testNonConcurrent(simpleIndexedTable, mergeTable);
 
-    IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
-    // max capacity 5, buffered capacity 10
-    concurrentIndexedTable.init(dataSchema, aggregationInfos, orderBy, 5);
-    mergeTable = new SimpleIndexedTable();
-    mergeTable.init(dataSchema, aggregationInfos, orderBy, 10);
+    // finish
+    simpleIndexedTable.finish(true);
+    checkSurvivors(simpleIndexedTable, survivors);
+
+    // Test ConcurrentIndexedTable
+    IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, 5);
+    mergeTable = new SimpleIndexedTable(dataSchema, aggregationInfos, orderBy, 10);
     testNonConcurrent(concurrentIndexedTable, mergeTable);
 
+    // finish
+    concurrentIndexedTable.finish(true);
+    checkSurvivors(concurrentIndexedTable, survivors);
+  }
+
+  @DataProvider(name = "initDataProvider")
+  public Object[][] initDataProvider() {
+
+    List<Object[]> data = new ArrayList<>();
+
+    SelectionSort sel1;
+    SelectionSort sel2;
+    List<SelectionSort> orderBy;
+    List<String> survivors;
+
+    // d1 desc
+    sel1 = new SelectionSort();
+    sel1.setColumn("d1");
+    sel1.setIsAsc(false);
+    orderBy = Lists.newArrayList(sel1);
+    survivors = Lists.newArrayList("m", "l", "k", "j", "i");
+    data.add(new Object[]{orderBy, survivors});
+
+    // d1 asc
+    sel1 = new SelectionSort();
+    sel1.setColumn("d1");
+    sel1.setIsAsc(true);
+    orderBy = Lists.newArrayList(sel1);
+    survivors = Lists.newArrayList("a", "b", "c", "d", "e");
+    data.add(new Object[]{orderBy, survivors});
+
+    // sum(m1) desc, d1 asc
+    sel1 = new SelectionSort();
+    sel1.setColumn("sum(m1)");
+    sel1.setIsAsc(false);
+    sel2 = new SelectionSort();
+    sel2.setColumn("d1");
+    sel2.setIsAsc(true);
+    orderBy = Lists.newArrayList(sel1, sel2);
+    survivors = Lists.newArrayList("m", "h", "i", "a", "b");
+    data.add(new Object[]{orderBy, survivors});
+
+    // d2 desc
+    sel1 = new SelectionSort();
+    sel1.setColumn("d2");
+    sel1.setIsAsc(false);
+    orderBy = Lists.newArrayList(sel1);
+    survivors = Lists.newArrayList("m", "l", "k", "j", "i");
+    data.add(new Object[]{orderBy, survivors});
+
+    // d4 asc, d1 asc
+    sel1 = new SelectionSort();
+    sel1.setColumn("d4");
+    sel1.setIsAsc(true);
+    sel2 = new SelectionSort();
+    sel2.setColumn("d1");
+    sel2.setIsAsc(true);
+    orderBy = Lists.newArrayList(sel1, sel2);
+    survivors = Lists.newArrayList("a", "b", "c", "d", "e");
+    data.add(new Object[]{orderBy, survivors});
+
+    return data.toArray(new Object[data.size()][]);
   }
 
   private void testNonConcurrent(IndexedTable indexedTable, IndexedTable mergeTable) {
 
     // 2 unique rows
-    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
+    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 1000}, new Object[]{10d, 100d}));
     Assert.assertEquals(indexedTable.size(), 1);
-    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
+    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d, 1000}, new Object[]{10d, 200d}));
     Assert.assertEquals(indexedTable.size(), 2);
 
     // repeat row a
-    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
-    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
+    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 1000}, new Object[]{10d, 100d}));
+    indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d, 1000}, new Object[]{10d, 100d}));
     Assert.assertEquals(indexedTable.size(), 2);
 
-    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
-    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
-    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
-    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
-    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
-    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
-    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
-    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
-    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{10d, 800d}));
-    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{10d, 900d}));
-
-    // reached max capacity
-    Assert.assertEquals(indexedTable.size(), 9);
-
-    // repeat row b
-    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
-    Assert.assertEquals(indexedTable.size(), 9);
-
-    // insert 1 more rows to reach buffer limit
-    indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
-
-    // resized to 5
+    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 1000}, new Object[]{10d, 300d}));
+    indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d, 1000}, new Object[]{10d, 300d}));
+    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 1000}, new Object[]{10d, 400d}));
+    indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d, 1000}, new Object[]{10d, 400d}));
+    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 1000}, new Object[]{10d, 500d}));
+    indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d, 1000}, new Object[]{10d, 500d}));
     Assert.assertEquals(indexedTable.size(), 5);
 
-    // filling up again
-    indexedTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
-    indexedTable.upsert(getRecord(new Object[]{"l", 12, 120d}, new Object[]{10d, 1200d}));
-    indexedTable.upsert(getRecord(new Object[]{"m", 13, 130d}, new Object[]{10d, 1300d}));
-    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
-    // repeat f
-    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
-    // repeat g
-    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
-    Assert.assertEquals(indexedTable.size(), 9);
+    // able to insert more, maxCapacity is very high
+    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d, 1000}, new Object[]{10d, 600d}));
+    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d, 1000}, new Object[]{10d, 700d}));
+    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d, 1000}, new Object[]{10d, 800d}));
+    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d, 1000}, new Object[]{10d, 900d}));
+    indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d, 1000}, new Object[]{10d, 1000d}));
+    Assert.assertEquals(indexedTable.size(), 10);
 
+    // repeat row b
+    indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d, 1000}, new Object[]{10d, 200d}));
+    Assert.assertEquals(indexedTable.size(), 10);
 
-    // repeat record j
-    mergeTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
-    // repeat record k
-    mergeTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
-    // repeat record b
-    mergeTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
-    // insert new record n
-    mergeTable.upsert(getRecord(new Object[]{"n", 14, 140d}, new Object[]{10d, 1400d}));
+    // create merge table, 2 new records for indexedTable, 2 repeat records
+    mergeTable.upsert(getRecord(new Object[]{"j", 10, 100d, 1000}, new Object[]{10d, 1000d}));
+    mergeTable.upsert(getRecord(new Object[]{"k", 11, 110d, 1000}, new Object[]{10d, 1100d}));
+    mergeTable.upsert(getRecord(new Object[]{"b", 2, 20d, 1000}, new Object[]{10d, 200d}));
+    mergeTable.upsert(getRecord(new Object[]{"l", 12, 120d, 1000}, new Object[]{10d, 1200d}));
     Assert.assertEquals(mergeTable.size(), 4);
     mergeTable.finish(false);
 
-    // merge with table
+    // merge with indexed table
     indexedTable.merge(mergeTable);
-    Assert.assertEquals(indexedTable.size(), 5);
+    Assert.assertEquals(indexedTable.size(), 12);
 
-    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{100d, 800d}));
-    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{50d, 900d}));
-    indexedTable.upsert(getRecord(new Object[]{"n", 14, 140d}, new Object[]{600d, 1400d}));
-
-    // finish
-    indexedTable.finish(false);
-    checkEvicted(indexedTable, "a", "c", "d", "e", "b", "j", "k", "f", "g");
-    Assert.assertEquals(indexedTable.size(), 5);
+    // insert more
+    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d, 1000}, new Object[]{100d, 800d}));
+    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d, 1000}, new Object[]{50d, 900d}));
+    indexedTable.upsert(getRecord(new Object[]{"m", 13, 130d, 1000}, new Object[]{600d, 1300d}));
+    Assert.assertEquals(indexedTable.size(), 13);
   }
 
   private void checkEvicted(Table indexedTable, String... evicted) {
@@ -259,6 +291,14 @@ public class IndexedTableTest {
     }
   }
 
+  private void checkSurvivors(Table indexedTable, List<String> survivors) {
+    Assert.assertEquals(survivors.size(), indexedTable.size());
+    Iterator<Record> iterator = indexedTable.iterator();
+    for (String survivor : survivors) {
+      Assert.assertEquals(survivor, iterator.next().getKey().getColumns()[0]);
+    }
+  }
+
   private Record getRecord(Object[] keys, Object[] values) {
     return new Record(new Key(keys), values);
   }
@@ -281,17 +321,15 @@ public class IndexedTableTest {
     agg2.setAggregationType("max");
     List<AggregationInfo> aggregationInfos = Lists.newArrayList(agg1, agg2);
 
-    IndexedTable indexedTable = new SimpleIndexedTable();
-    indexedTable.init(dataSchema, aggregationInfos, null, 5);
+    IndexedTable indexedTable = new SimpleIndexedTable(dataSchema, aggregationInfos, null, 5);
     testNoMoreNewRecordsInTable(indexedTable);
 
-    indexedTable = new ConcurrentIndexedTable();
-    indexedTable.init(dataSchema, aggregationInfos, null, 5);
+    indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, null, 5);
     testNoMoreNewRecordsInTable(indexedTable);
   }
 
   private void testNoMoreNewRecordsInTable(IndexedTable indexedTable) {
-    // Insert 14 records. Check that last 2 never made it.
+    // Insert 7 records. Check that last 2 never made it.
     indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
     indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
     indexedTable.upsert(getRecord(new Object[]{"a", 1, 10d}, new Object[]{10d, 100d}));
@@ -301,25 +339,20 @@ public class IndexedTableTest {
     indexedTable.upsert(getRecord(new Object[]{"c", 3, 30d}, new Object[]{10d, 300d}));
     indexedTable.upsert(getRecord(new Object[]{"d", 4, 40d}, new Object[]{10d, 400d}));
     indexedTable.upsert(getRecord(new Object[]{"e", 5, 50d}, new Object[]{10d, 500d}));
-    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
-    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
-    indexedTable.upsert(getRecord(new Object[]{"h", 8, 80d}, new Object[]{10d, 800d}));
-    indexedTable.upsert(getRecord(new Object[]{"i", 9, 90d}, new Object[]{10d, 900d}));
-    Assert.assertEquals(indexedTable.size(), 9);
+    Assert.assertEquals(indexedTable.size(), 5);
 
-    indexedTable.upsert(getRecord(new Object[]{"j", 10, 100d}, new Object[]{10d, 1000d}));
     // no resize. no more records allowed
-    indexedTable.upsert(getRecord(new Object[]{"k", 11, 110d}, new Object[]{10d, 1100d}));
-    indexedTable.upsert(getRecord(new Object[]{"l", 12, 120d}, new Object[]{10d, 1200d}));
-    Assert.assertEquals(indexedTable.size(), 10);
+    indexedTable.upsert(getRecord(new Object[]{"f", 6, 60d}, new Object[]{10d, 600d}));
+    indexedTable.upsert(getRecord(new Object[]{"g", 7, 70d}, new Object[]{10d, 700d}));
+    Assert.assertEquals(indexedTable.size(), 5);
 
     // existing row allowed
     indexedTable.upsert(getRecord(new Object[]{"b", 2, 20d}, new Object[]{10d, 200d}));
-    Assert.assertEquals(indexedTable.size(), 10);
+    Assert.assertEquals(indexedTable.size(), 5);
 
     indexedTable.finish(false);
 
-    checkEvicted(indexedTable, "k", "l");
+    checkEvicted(indexedTable, "f", "g");
 
   }
 }
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
index 2338212..dc2d984 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkCombineGroupBy.java
@@ -39,6 +39,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.GroupBy;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
@@ -79,6 +80,7 @@ public class BenchmarkCombineGroupBy {
 
   private DataSchema _dataSchema;
   private List<AggregationInfo> _aggregationInfos;
+  private GroupBy _groupBy;
   private AggregationFunction[] _aggregationFunctions;
   private List<SelectionSort> _orderBy;
   private int _numAggregationFunctions;
@@ -126,6 +128,10 @@ public class BenchmarkCombineGroupBy {
       _aggregationFunctions[i] = AggregationFunctionFactory.getAggregationFunction(_aggregationInfos.get(i), null);
     }
 
+    _groupBy = new GroupBy();
+    _groupBy.setTopN(TOP_N);
+    _groupBy.setExpressions(Lists.newArrayList("d1", "d2"));
+
     SelectionSort orderBy = new SelectionSort();
     orderBy.setColumn("sum(m1)");
     orderBy.setIsAsc(true);
@@ -157,11 +163,11 @@ public class BenchmarkCombineGroupBy {
   @OutputTimeUnit(TimeUnit.MICROSECONDS)
   public void concurrentIndexedTableForCombineGroupBy() throws InterruptedException, ExecutionException, TimeoutException {
 
-    int capacity = 200_000;//GroupByUtils.getTableCapacity(TOP_N);
+    int capacity = GroupByUtils.getTableCapacity(_groupBy, _orderBy);
 
     // make 1 concurrent table
-    IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
-    concurrentIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, capacity);
+    IndexedTable concurrentIndexedTable =
+        new ConcurrentIndexedTable(_dataSchema, _aggregationInfos, _orderBy, capacity);
 
     List<Callable<Void>> innerSegmentCallables = new ArrayList<>(NUM_SEGMENTS);
 
diff --git a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
index 5212f3d..10c0179 100644
--- a/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
+++ b/pinot-perf/src/main/java/org/apache/pinot/perf/BenchmarkIndexedTable.java
@@ -135,8 +135,8 @@ public class BenchmarkIndexedTable {
     int numSegments = 10;
 
     // make 1 concurrent table
-    IndexedTable concurrentIndexedTable = new ConcurrentIndexedTable();
-    concurrentIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
+    IndexedTable concurrentIndexedTable =
+        new ConcurrentIndexedTable(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
 
     // 10 parallel threads putting 10k records into the table
 
@@ -187,8 +187,7 @@ public class BenchmarkIndexedTable {
     for (int i = 0; i < numSegments; i++) {
 
       // make 10 indexed tables
-      IndexedTable simpleIndexedTable = new SimpleIndexedTable();
-      simpleIndexedTable.init(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
+      IndexedTable simpleIndexedTable = new SimpleIndexedTable(_dataSchema, _aggregationInfos, _orderBy, CAPACITY);
       simpleIndexedTables.add(simpleIndexedTable);
 
       // put 10k records in each indexed table, in parallel


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org