You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/05/30 00:19:49 UTC

[incubator-pinot] branch master updated: Refactor DistinctTable to use PriorityQueue based algorithm (#5451)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 44a1e2e  Refactor DistinctTable to use PriorityQueue based algorithm (#5451)
44a1e2e is described below

commit 44a1e2e237fc6ce7570241728514b55240acb52f
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri May 29 17:19:39 2020 -0700

    Refactor DistinctTable to use PriorityQueue based algorithm (#5451)
    
    Currently DISTINCT query is solved the same way as GROUP-BY queries,
    which is not necessary (consume much more memory and CPU) and does
    not guarantee accuracy of the result.
    
    Instead, DISTINCT query can be solved by a set and a heap efficiently
    (similar to SelectionOrderBy but unique records need to be tracked).
    
    The new DistinctTable does not implement the Table interface because
    the table interface is designed mostly for the GROUP-BY queries, and
    is not efficient for DISTINCT. If in the future we want to use Table
    interface to uniform all the input/output, we can redesign the Table
    interface to make it suitable for all types of queries.
---
 .../requesthandler/BaseBrokerRequestHandler.java   |   4 +
 .../apache/pinot/core/common/ObjectSerDeUtils.java |   2 +-
 .../apache/pinot/core/data/table/TableResizer.java | 191 ++----------
 .../core/query/aggregation/DistinctTable.java      | 249 ---------------
 .../function/DistinctAggregationFunction.java      |  59 ++--
 .../function/customobject/DistinctTable.java       | 334 +++++++++++++++++++++
 .../query/reduce/DistinctDataTableReducer.java     |  91 +++---
 .../core/query/reduce/ResultReducerFactory.java    |   2 +-
 .../pinot/core/data/table/TableResizerTest.java    | 158 ----------
 .../apache/pinot/queries/DistinctQueriesTest.java  |  57 ++--
 ...erSegmentAggregationSingleValueQueriesTest.java |  10 +-
 11 files changed, 466 insertions(+), 691 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
index 6a00878..e7869a2 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseBrokerRequestHandler.java
@@ -715,6 +715,10 @@ public abstract class BaseBrokerRequestHandler implements BrokerRequestHandler {
             // TODO: Explore if DISTINCT should be supported with GROUP BY
             throw new UnsupportedOperationException("DISTINCT with GROUP BY is currently not supported");
           }
+          if (brokerRequest.getLimit() == 0) {
+            // TODO: Consider changing it to SELECTION query for LIMIT 0
+            throw new UnsupportedOperationException("DISTINCT must have positive LIMIT");
+          }
           if (brokerRequest.isSetOrderBy()) {
             Set<String> expressionSet = new HashSet<>(AggregationFunctionUtils.getArguments(aggregationInfo));
             List<SelectionSort> orderByColumns = brokerRequest.getOrderBy();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 88c5cf8..f471e37 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -36,8 +36,8 @@ import java.util.Map;
 import org.apache.datasketches.memory.Memory;
 import org.apache.datasketches.theta.Sketch;
 import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.core.query.aggregation.DistinctTable;
 import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
+import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
index 1fa1deb..a474c3d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/TableResizer.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.core.data.table;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -29,7 +28,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
-import java.util.Set;
 import java.util.function.Function;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
@@ -39,12 +37,11 @@ import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 /**
  * Helper class for trimming and sorting records in the IndexedTable, based on the order by information
  */
+@SuppressWarnings("rawtypes")
 public class TableResizer {
-
-  private OrderByValueExtractor[] _orderByValueExtractors;
-  private Comparator<IntermediateRecord> _intermediateRecordComparator;
-  private Comparator<Record> _recordComparator;
-  protected int _numOrderBy;
+  private final OrderByValueExtractor[] _orderByValueExtractors;
+  private final Comparator<IntermediateRecord> _intermediateRecordComparator;
+  private final int _numOrderBy;
 
   TableResizer(DataSchema dataSchema, AggregationFunction[] aggregationFunctions, List<SelectionSort> orderBy) {
 
@@ -69,56 +66,38 @@ public class TableResizer {
     _orderByValueExtractors = new OrderByValueExtractor[_numOrderBy];
     Comparator[] comparators = new Comparator[_numOrderBy];
 
-    if (numKeyColumns < numColumns) {
-      for (int orderByIdx = 0; orderByIdx < _numOrderBy; orderByIdx++) {
-        SelectionSort selectionSort = orderBy.get(orderByIdx);
-        String column = selectionSort.getColumn();
-
-        if (columnIndexMap.containsKey(column)) {
-          int index = columnIndexMap.get(column);
-          if (index < numKeyColumns) {
-            _orderByValueExtractors[orderByIdx] = new KeyColumnExtractor(index);
-          } else {
-            AggregationFunction aggregationFunction = aggregationColumnToFunction.get(column);
-            _orderByValueExtractors[orderByIdx] = new AggregationColumnExtractor(index, aggregationFunction);
-          }
+    for (int orderByIdx = 0; orderByIdx < _numOrderBy; orderByIdx++) {
+      SelectionSort selectionSort = orderBy.get(orderByIdx);
+      String column = selectionSort.getColumn();
+
+      if (columnIndexMap.containsKey(column)) {
+        int index = columnIndexMap.get(column);
+        if (index < numKeyColumns) {
+          _orderByValueExtractors[orderByIdx] = new KeyColumnExtractor(index);
         } else {
-          throw new IllegalStateException("Could not find column " + column + " in data schema");
+          AggregationFunction aggregationFunction = aggregationColumnToFunction.get(column);
+          _orderByValueExtractors[orderByIdx] = new AggregationColumnExtractor(index, aggregationFunction);
         }
+      } else {
+        throw new IllegalStateException("Could not find column " + column + " in data schema");
+      }
 
-        comparators[orderByIdx] = Comparator.naturalOrder();
-        if (!selectionSort.isIsAsc()) {
-          comparators[orderByIdx] = comparators[orderByIdx].reversed();
-        }
+      comparators[orderByIdx] = Comparator.naturalOrder();
+      if (!selectionSort.isIsAsc()) {
+        comparators[orderByIdx] = comparators[orderByIdx].reversed();
       }
+    }
 
-      _intermediateRecordComparator = (o1, o2) -> {
+    _intermediateRecordComparator = (o1, o2) -> {
 
-        for (int i = 0; i < _numOrderBy; i++) {
-          int result = comparators[i].compare(o1._values[i], o2._values[i]);
-          if (result != 0) {
-            return result;
-          }
-        }
-        return 0;
-      };
-    } else {
-      // For cases where the entire Record is unique and is treated as a key
-      Preconditions
-          .checkState(numKeyColumns == numColumns, "number of key columns should be equal to total number of columns");
-      int[] orderByIndexes = new int[_numOrderBy];
-      boolean[] orderByAsc = new boolean[_numOrderBy];
       for (int i = 0; i < _numOrderBy; i++) {
-        SelectionSort selectionSort = orderBy.get(i);
-        String column = selectionSort.getColumn();
-        int orderByColIndex = columnIndexMap.get(column);
-        orderByIndexes[i] = orderByColIndex;
-        if (selectionSort.isIsAsc()) {
-          orderByAsc[i] = true;
+        int result = comparators[i].compare(o1._values[i], o2._values[i]);
+        if (result != 0) {
+          return result;
         }
       }
-      _recordComparator = new RecordComparator(orderByIndexes, orderByAsc);
-    }
+      return 0;
+    };
   }
 
   /**
@@ -316,120 +295,4 @@ public class TableResizer {
       return _convertorFunction.apply(aggregationColumn);
     }
   }
-
-  /********************************************************
-   *                                                      *
-   * Resize functions for Set based table implementation  *
-   *                                                      *
-   ********************************************************/
-
-  private class RecordComparator implements Comparator<Record> {
-    final int[] _orderByColumnIndexes;
-    final boolean[] _orderByAsc;
-
-    RecordComparator(int[] orderByColumnIndexes, boolean[] orderByAsc) {
-      _orderByColumnIndexes = orderByColumnIndexes;
-      _orderByAsc = orderByAsc;
-    }
-
-    @Override
-    public int compare(Record record1, Record record2) {
-      Object[] values1 = record1.getValues();
-      Object[] values2 = record2.getValues();
-      for (int i = 0; i < _numOrderBy; i++) {
-        Comparable valueToCompare1 = (Comparable) values1[_orderByColumnIndexes[i]];
-        Comparable valueToCompare2 = (Comparable) values2[_orderByColumnIndexes[i]];
-        int result =
-            _orderByAsc[i] ? valueToCompare1.compareTo(valueToCompare2) : valueToCompare2.compareTo(valueToCompare1);
-        if (result != 0) {
-          return result;
-        }
-      }
-      return 0;
-    }
-  }
-
-  public void resizeRecordsSet(Set<Record> recordSet, int trimToSize) {
-    int numRecordsToEvict = recordSet.size() - trimToSize;
-    if (numRecordsToEvict > 0) {
-      if (numRecordsToEvict < trimToSize) {
-        // num records to evict is smaller than num records to retain
-        // make PQ of records to evict
-        PriorityQueue<Record> priorityQueue =
-            buildPriorityQueueFromRecordSet(numRecordsToEvict, recordSet, _recordComparator);
-        for (Record recordToEvict : priorityQueue) {
-          recordSet.remove(recordToEvict);
-        }
-      } else {
-        // num records to retain is smaller than num records to evict
-        // make PQ of records to retain
-        PriorityQueue<Record> priorityQueue =
-            buildPriorityQueueFromRecordSet(trimToSize, recordSet, _recordComparator.reversed());
-        ObjectOpenHashSet<Record> recordsToRetain = new ObjectOpenHashSet<>(priorityQueue.size());
-        for (Record recordToRetain : priorityQueue) {
-          recordsToRetain.add(recordToRetain);
-        }
-        recordSet.retainAll(recordsToRetain);
-      }
-    }
-  }
-
-  private PriorityQueue<Record> buildPriorityQueueFromRecordSet(int size, Set<Record> recordSet,
-      Comparator<Record> comparator) {
-    PriorityQueue<Record> priorityQueue = new PriorityQueue<>(size, comparator);
-    for (Record record : recordSet) {
-      if (priorityQueue.size() < size) {
-        priorityQueue.offer(record);
-      } else {
-        Record peek = priorityQueue.peek();
-        if (comparator.compare(peek, record) < 0) {
-          priorityQueue.poll();
-          priorityQueue.offer(record);
-        }
-      }
-    }
-    return priorityQueue;
-  }
-
-  private List<Record> sortRecordSet(Set<Record> recordSet) {
-    int numRecords = recordSet.size();
-    List<Record> sortedRecords = new ArrayList<>(numRecords);
-    sortedRecords.addAll(recordSet);
-    sortedRecords.sort(_recordComparator);
-    return sortedRecords;
-  }
-
-  public List<Record> resizeAndSortRecordSet(Set<Record> recordSet, int trimToSize) {
-    int numRecords = recordSet.size();
-    if (numRecords == 0) {
-      return Collections.emptyList();
-    }
-
-    int numRecordsToRetain = Math.min(numRecords, trimToSize);
-    int numRecordsToEvict = numRecords - numRecordsToRetain;
-
-    if (numRecordsToEvict < numRecordsToRetain) {
-      // num records to evict is smaller than num records to retain
-      if (numRecordsToEvict > 0) {
-        // make PQ of records to evict
-        PriorityQueue<Record> priorityQueue =
-            buildPriorityQueueFromRecordSet(numRecordsToEvict, recordSet, _recordComparator);
-        for (Record recordToEvict : priorityQueue) {
-          recordSet.remove(recordToEvict);
-        }
-      }
-      return sortRecordSet(recordSet);
-    } else {
-      // make PQ of records to retain
-      PriorityQueue<Record> priorityQueue =
-          buildPriorityQueueFromRecordSet(numRecordsToRetain, recordSet, _recordComparator.reversed());
-      // use PQ to get sorted list
-      Record[] sortedArray = new Record[numRecordsToRetain];
-      while (!priorityQueue.isEmpty()) {
-        Record record = priorityQueue.poll();
-        sortedArray[--numRecordsToRetain] = record;
-      }
-      return Arrays.asList(sortedArray);
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
deleted file mode 100644
index d294218..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/DistinctTable.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.query.aggregation;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Set;
-import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.common.utils.HashUtil;
-import org.apache.pinot.core.common.datatable.DataTableBuilder;
-import org.apache.pinot.core.common.datatable.DataTableFactory;
-import org.apache.pinot.core.data.table.BaseTable;
-import org.apache.pinot.core.data.table.Key;
-import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.spi.utils.ByteArray;
-
-
-/**
- * This serves the following purposes:
- *
- * (1) Intermediate result object for Distinct aggregation function
- * (2) The same object is serialized by the server inside the data table
- * for sending the results to broker. Broker deserializes it.
- * (3) This is also another concrete implementation of {@link BaseTable} and
- * uses {@link Set} to store unique records.
- */
-public class DistinctTable extends BaseTable {
-  private static final int MAX_INITIAL_CAPACITY = 64 * 1024;
-  private Set<Record> _uniqueRecordsSet;
-  private boolean _noMoreNewRecords;
-  private Iterator<Record> _sortedIterator;
-
-  public DistinctTable(DataSchema dataSchema, List<SelectionSort> orderBy, int capacity) {
-    // TODO: see if 64k is the right max initial capacity to use
-    // NOTE: The passed in capacity is calculated based on the LIMIT in the query as Math.max(limit * 5, 5000). When
-    //       LIMIT is smaller than (64 * 1024 * 0.75 (load factor) / 5 = 9830), then it is guaranteed that no resize is
-    //       required.
-    super(dataSchema, new AggregationFunction[0], orderBy, capacity);
-    int initialCapacity = Math.min(MAX_INITIAL_CAPACITY, HashUtil.getHashMapCapacity(capacity));
-    _uniqueRecordsSet = new HashSet<>(initialCapacity);
-    _noMoreNewRecords = false;
-  }
-
-  @Override
-  public boolean upsert(Key key, Record record) {
-    throw new UnsupportedOperationException("Operation not supported");
-  }
-
-  @Override
-  public boolean upsert(Record newRecord) {
-    if (_noMoreNewRecords) {
-      // for no ORDER BY queries, if we have reached the N as specified
-      // in LIMIT N (or default 10 if user didn't specify anything)
-      // then this function is NOOP
-      return false;
-    }
-
-    _uniqueRecordsSet.add(newRecord);
-
-    if (_uniqueRecordsSet.size() >= _maxCapacity) {
-      if (_isOrderBy) {
-        // ORDER BY; capacity < maxCapacity so trim to capacity
-        resize(_capacity);
-      } else {
-        // No ORDER BY; capacity == maxCapacity == user specified limit
-        // we can simply stop accepting anymore records from now on
-        _noMoreNewRecords = true;
-      }
-    }
-
-    return true;
-  }
-
-  /**
-   * SERIALIZE: Server side
-   * @return serialized bytes
-   * @throws IOException
-   */
-  public byte[] toBytes()
-      throws IOException {
-    // build rows for data table
-    // Only after the server level merge is done by CombineOperator to merge all the indexed tables
-    // of segments 1 .. N - 1 into the indexed table of 0th segment, we do finish(false) as that
-    // time we need the iterator as well and we send a trimmed set of records to the broker.
-    // finish is NOOP for non ORDER BY queries.
-    finish(false);
-    DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
-    Iterator<Record> iterator = iterator();
-    while (iterator.hasNext()) {
-      dataTableBuilder.startRow();
-      final Record record = iterator.next();
-      serializeColumns(record.getValues(), _dataSchema.getColumnDataTypes(), dataTableBuilder);
-      dataTableBuilder.finishRow();
-    }
-    DataTable dataTable = dataTableBuilder.build();
-    return dataTable.toBytes();
-  }
-
-  private void serializeColumns(Object[] columns, DataSchema.ColumnDataType[] columnDataTypes,
-      DataTableBuilder dataTableBuilder)
-      throws IOException {
-    for (int colIndex = 0; colIndex < columns.length; colIndex++) {
-      switch (columnDataTypes[colIndex]) {
-        case INT:
-          dataTableBuilder.setColumn(colIndex, ((Number) columns[colIndex]).intValue());
-          break;
-        case LONG:
-          dataTableBuilder.setColumn(colIndex, ((Number) columns[colIndex]).longValue());
-          break;
-        case FLOAT:
-          dataTableBuilder.setColumn(colIndex, ((Number) columns[colIndex]).floatValue());
-          break;
-        case DOUBLE:
-          dataTableBuilder.setColumn(colIndex, ((Number) columns[colIndex]).doubleValue());
-          break;
-        case STRING:
-          dataTableBuilder.setColumn(colIndex, ((String) columns[colIndex]));
-          break;
-        case BYTES:
-          dataTableBuilder.setColumn(colIndex, (ByteArray) columns[colIndex]);
-          break;
-        // Add other distinct column type supports here
-        default:
-          throw new IllegalStateException();
-      }
-    }
-  }
-
-  /**
-   * DESERIALIZE: Broker side
-   * @param byteBuffer data to deserialize
-   * @throws IOException
-   */
-  public DistinctTable(ByteBuffer byteBuffer)
-      throws IOException {
-    // This is called by the BrokerReduceService when it de-serializes the
-    // DISTINCT result from the DataTable. As of now we don't have all the
-    // information to pass to super class so just pass null, empty lists
-    // and the broker will set the correct information before merging the
-    // data tables.
-    super(new DataSchema(new String[0], new DataSchema.ColumnDataType[0]), new AggregationFunction[0],
-        new ArrayList<>(), 0);
-    DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
-    _dataSchema = dataTable.getDataSchema();
-    _uniqueRecordsSet = new HashSet<>();
-
-    int numRows = dataTable.getNumberOfRows();
-    int numColumns = _dataSchema.size();
-
-    // extract rows from the datatable
-    for (int rowIndex = 0; rowIndex < numRows; rowIndex++) {
-      Object[] columnValues = new Object[numColumns];
-      for (int colIndex = 0; colIndex < numColumns; colIndex++) {
-        DataSchema.ColumnDataType columnDataType = _dataSchema.getColumnDataType(colIndex);
-        switch (columnDataType) {
-          case INT:
-            columnValues[colIndex] = dataTable.getInt(rowIndex, colIndex);
-            break;
-          case LONG:
-            columnValues[colIndex] = dataTable.getLong(rowIndex, colIndex);
-            break;
-          case FLOAT:
-            columnValues[colIndex] = dataTable.getFloat(rowIndex, colIndex);
-            break;
-          case DOUBLE:
-            columnValues[colIndex] = dataTable.getDouble(rowIndex, colIndex);
-            break;
-          case STRING:
-            columnValues[colIndex] = dataTable.getString(rowIndex, colIndex);
-            break;
-          case BYTES:
-            columnValues[colIndex] = dataTable.getBytes(rowIndex, colIndex);
-            break;
-          // Add other distinct column type supports here
-          default:
-            throw new IllegalStateException(
-                "Unexpected column data type " + columnDataType + " while deserializing data table for DISTINCT query");
-        }
-      }
-
-      Record record = new Record(columnValues);
-      _uniqueRecordsSet.add(record);
-    }
-  }
-
-  /**
-   * Called by {@link org.apache.pinot.core.query.reduce.BrokerReduceService}
-   * just before it attempts to invoke merge() on
-   * {@link org.apache.pinot.core.query.aggregation.function.AggregationFunction}
-   * for DISTINCT. Since the DISTINCT uses IndexedTable underneath as the main
-   * data structure to hold unique tuples/rows and also for resizing/trimming/sorting,
-   * we need to pass info on limit, order by to super class
-   * {@link org.apache.pinot.core.data.table.IndexedTable}.
-   * @param brokerRequest broker request
-   */
-  public void addLimitAndOrderByInfo(BrokerRequest brokerRequest) {
-    addCapacityAndOrderByInfo(brokerRequest.getOrderBy(), brokerRequest.getLimit());
-  }
-
-  private void resize(int trimToSize) {
-    _tableResizer.resizeRecordsSet(_uniqueRecordsSet, trimToSize);
-  }
-
-  @Override
-  public int size() {
-    return _uniqueRecordsSet.size();
-  }
-
-  @Override
-  public Iterator<Record> iterator() {
-    return _sortedIterator != null ? _sortedIterator : _uniqueRecordsSet.iterator();
-  }
-
-  @Override
-  public void finish(boolean sort) {
-    if (_isOrderBy) {
-      if (sort) {
-        List<Record> sortedRecords = _tableResizer.resizeAndSortRecordSet(_uniqueRecordsSet, _capacity);
-        _sortedIterator = sortedRecords.iterator();
-      } else {
-        resize(_capacity);
-      }
-    }
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
index 8aceb29..3e33a03 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.collections.CollectionUtils;
+import javax.annotation.Nullable;
 import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.request.transform.TransformExpressionTree;
@@ -33,20 +33,20 @@ import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.DistinctTable;
 import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
-import org.apache.pinot.core.util.GroupByUtils;
 
 
 /**
  * The DISTINCT clause in SQL is executed as the DISTINCT aggregation function.
- * // TODO: Support group-by
+ * TODO: Support group-by
  */
+@SuppressWarnings("rawtypes")
 public class DistinctAggregationFunction implements AggregationFunction<DistinctTable, Comparable> {
   private final String[] _columns;
   private final List<SelectionSort> _orderBy;
-  private final int _capacity;
+  private final int _limit;
   private final List<TransformExpressionTree> _inputExpressions;
 
   /**
@@ -56,14 +56,11 @@ public class DistinctAggregationFunction implements AggregationFunction<Distinct
    * @param orderBy Order By clause
    * @param limit Limit clause
    */
-  public DistinctAggregationFunction(List<String> columns, List<SelectionSort> orderBy, int limit) {
+  public DistinctAggregationFunction(List<String> columns, @Nullable List<SelectionSort> orderBy, int limit) {
     int numColumns = columns.size();
     _columns = columns.toArray(new String[numColumns]);
     _orderBy = orderBy;
-    // NOTE: DISTINCT with order-by is similar to group-by with order-by, where we limit the maximum number of unique
-    //       records (groups) for each query to reduce the memory footprint. The result might not be 100% accurate in
-    //       certain scenarios, but should give a good enough approximation.
-    _capacity = CollectionUtils.isNotEmpty(_orderBy) ? GroupByUtils.getTableCapacity(limit) : limit;
+    _limit = limit;
 
     _inputExpressions = new ArrayList<>(numColumns);
     for (String column : columns) {
@@ -123,21 +120,30 @@ public class DistinctAggregationFunction implements AggregationFunction<Distinct
         columnDataTypes[i] = ColumnDataType.fromDataTypeSV(blockValSetMap.get(_inputExpressions.get(i)).getValueType());
       }
       DataSchema dataSchema = new DataSchema(_columns, columnDataTypes);
-      distinctTable = new DistinctTable(dataSchema, _orderBy, _capacity);
+      distinctTable = new DistinctTable(dataSchema, _orderBy, _limit);
       aggregationResultHolder.setValue(distinctTable);
     }
 
-    // TODO: Follow up PR will make few changes to start using DictionaryBasedAggregationOperator
-    // for DISTINCT queries without filter.
-    RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
-
-    // TODO: Do early termination in the operator itself which should
-    // not call aggregate function at all if the limit has reached
-    // that will require the interface change since this function
-    // has to communicate back that required number of records have
-    // been collected
-    for (int i = 0; i < length; i++) {
-      distinctTable.upsert(new Record(blockValueFetcher.getRow(i)));
+    // TODO: Follow up PR will make few changes to start using DictionaryBasedAggregationOperator for DISTINCT queries
+    //       without filter.
+
+    if (distinctTable.hasOrderBy()) {
+      // With order-by, no need to check whether the DistinctTable is already satisfied
+      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+      for (int i = 0; i < length; i++) {
+        distinctTable.addWithOrderBy(new Record(blockValueFetcher.getRow(i)));
+      }
+    } else {
+      // Without order-by, early-terminate when the DistinctTable is already satisfied
+      if (distinctTable.isSatisfied()) {
+        return;
+      }
+      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+      for (int i = 0; i < length; i++) {
+        if (distinctTable.addWithoutOrderBy(new Record(blockValueFetcher.getRow(i)))) {
+          return;
+        }
+      }
     }
   }
 
@@ -150,17 +156,22 @@ public class DistinctAggregationFunction implements AggregationFunction<Distinct
       ColumnDataType[] columnDataTypes = new ColumnDataType[_columns.length];
       // NOTE: Use STRING for unknown type
       Arrays.fill(columnDataTypes, ColumnDataType.STRING);
-      return new DistinctTable(new DataSchema(_columns, columnDataTypes), _orderBy, _capacity);
+      return new DistinctTable(new DataSchema(_columns, columnDataTypes), _orderBy, _limit);
     }
   }
 
+  /**
+   * NOTE: This method only handles merging of 2 main DistinctTables. It should not be used on Broker-side because it
+   *       does not support merging deserialized DistinctTables.
+   * <p>{@inheritDoc}
+   */
   @Override
   public DistinctTable merge(DistinctTable intermediateResult1, DistinctTable intermediateResult2) {
     if (intermediateResult1.size() == 0) {
       return intermediateResult2;
     }
     if (intermediateResult2.size() != 0) {
-      intermediateResult1.merge(intermediateResult2);
+      intermediateResult1.mergeMainDistinctTable(intermediateResult2);
     }
     return intermediateResult1;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java
new file mode 100644
index 0000000..776e864
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java
@@ -0,0 +1,334 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.customobject;
+
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.PriorityQueue;
+import java.util.Set;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableFactory;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * The {@code DistinctTable} class serves as the intermediate result of
+ * {@link org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction}.
+ * <p>There are 2 types of DistinctTables:
+ * <ul>
+ *   <li>
+ *     Main DistinctTable: Constructed with DataSchema, order-by information and limit, which can be used to add records
+ *     or merge other DistinctTables.
+ *   </li>
+ *   <li>
+ *     Deserialized DistinctTable (Broker-side only): Constructed with ByteBuffer, which only contains the DataSchema
+ *     and records from the original main DistinctTable, but no data structure to handle the addition of new records. It
+ *     cannot be used to add more records or merge other DistinctTables, but can only be used to be merged into the main
+ *     DistinctTable.
+ *   </li>
+ * </ul>
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class DistinctTable {
+  private static final int MAX_INITIAL_CAPACITY = 10000;
+
+  // Available in both main and Broker-side de-serialized DistinctTable
+  private final DataSchema _dataSchema;
+
+  // Available in main DistinctTable only
+  private final int _limit;
+  private final Set<Record> _uniqueRecords;
+  private final PriorityQueue<Record> _sortedRecords;
+
+  // Available in Broker-side deserialized DistinctTable only
+  private final List<Record> _records;
+
+  /**
+   * Constructor of the main DistinctTable which can be used to add records and merge other DistinctTables.
+   */
+  public DistinctTable(DataSchema dataSchema, @Nullable List<SelectionSort> orderBy, int limit) {
+    _dataSchema = dataSchema;
+    _limit = limit;
+
+    // TODO: see if 10k is the right max initial capacity to use
+    // NOTE: When LIMIT is smaller than or equal to the MAX_INITIAL_CAPACITY, no resize is required.
+    int initialCapacity = Math.min(limit, MAX_INITIAL_CAPACITY);
+    _uniqueRecords = new ObjectOpenHashSet<>(initialCapacity);
+    if (orderBy != null) {
+      String[] columns = dataSchema.getColumnNames();
+      int numColumns = columns.length;
+      Object2IntOpenHashMap<String> columnIndexMap = new Object2IntOpenHashMap<>(numColumns);
+      for (int i = 0; i < numColumns; i++) {
+        columnIndexMap.put(columns[i], i);
+      }
+      int numOrderByColumns = orderBy.size();
+      int[] orderByColumnIndexes = new int[numOrderByColumns];
+      boolean[] orderByAsc = new boolean[numOrderByColumns];
+      for (int i = 0; i < numOrderByColumns; i++) {
+        SelectionSort selectionSort = orderBy.get(i);
+        orderByColumnIndexes[i] = columnIndexMap.getInt(selectionSort.getColumn());
+        orderByAsc[i] = selectionSort.isIsAsc();
+      }
+      _sortedRecords = new PriorityQueue<>(initialCapacity, (record1, record2) -> {
+        Object[] values1 = record1.getValues();
+        Object[] values2 = record2.getValues();
+        for (int i = 0; i < numOrderByColumns; i++) {
+          Comparable valueToCompare1 = (Comparable) values1[orderByColumnIndexes[i]];
+          Comparable valueToCompare2 = (Comparable) values2[orderByColumnIndexes[i]];
+          int result =
+              orderByAsc[i] ? valueToCompare2.compareTo(valueToCompare1) : valueToCompare1.compareTo(valueToCompare2);
+          if (result != 0) {
+            return result;
+          }
+        }
+        return 0;
+      });
+    } else {
+      _sortedRecords = null;
+    }
+    _records = null;
+  }
+
+  /**
+   * Returns the {@link DataSchema} of the DistinctTable.
+   */
+  public DataSchema getDataSchema() {
+    return _dataSchema;
+  }
+
+  /**
+   * Returns the number of unique records within the DistinctTable.
+   */
+  public int size() {
+    if (_uniqueRecords != null) {
+      // Main DistinctTable
+      return _uniqueRecords.size();
+    } else {
+      // Deserialized DistinctTable
+      return _records.size();
+    }
+  }
+
+  /**
+   * Returns {@code true} if the main DistinctTable has order-by, {@code false} otherwise.
+   */
+  public boolean hasOrderBy() {
+    return _sortedRecords != null;
+  }
+
+  /**
+   * Adds a record to the main DistinctTable without order-by and returns whether the DistinctTable is already
+   * satisfied.
+   * <p>NOTE: There should be no more calls to this method after it or {@link #isSatisfied()} returns {@code true}.
+   */
+  public boolean addWithoutOrderBy(Record record) {
+    if (_uniqueRecords.add(record)) {
+      return isSatisfied();
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Returns {@code true} if the main DistinctTable without order-by is already satisfied, {@code false} otherwise.
+   * <p>The DistinctTable is satisfied when enough unique records have been collected.
+   */
+  public boolean isSatisfied() {
+    return _uniqueRecords.size() == _limit;
+  }
+
+  /**
+   * Adds a record to the main DistinctTable with order-by.
+   */
+  public void addWithOrderBy(Record record) {
+    if (!_uniqueRecords.contains(record)) {
+      if (_sortedRecords.size() < _limit) {
+        _uniqueRecords.add(record);
+        _sortedRecords.offer(record);
+      } else {
+        Record leastRecord = _sortedRecords.peek();
+        if (_sortedRecords.comparator().compare(record, leastRecord) > 0) {
+          _uniqueRecords.remove(leastRecord);
+          _uniqueRecords.add(record);
+          _sortedRecords.poll();
+          _sortedRecords.offer(record);
+        }
+      }
+    }
+  }
+
+  /*
+   * SERVER ONLY METHODS
+   */
+
+  /**
+   * Merges another main DistinctTable into the main DistinctTable.
+   */
+  public void mergeMainDistinctTable(DistinctTable distinctTable) {
+    mergeRecords(distinctTable._uniqueRecords);
+  }
+
+  /**
+   * Helper method to merge a collection of records into the main DistinctTable.
+   */
+  private void mergeRecords(Collection<Record> records) {
+    if (hasOrderBy()) {
+      for (Record record : records) {
+        addWithOrderBy(record);
+      }
+    } else {
+      if (isSatisfied()) {
+        return;
+      }
+      for (Record record : records) {
+        if (addWithoutOrderBy(record)) {
+          return;
+        }
+      }
+    }
+  }
+
+  /**
+   * Serializes the main DistinctTable into a byte array.
+   */
+  public byte[] toBytes()
+      throws IOException {
+    // NOTE: Serialize the DistinctTable as a DataTable
+    DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
+    DataSchema.ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    for (Record record : _uniqueRecords) {
+      dataTableBuilder.startRow();
+      Object[] values = record.getValues();
+      for (int i = 0; i < numColumns; i++) {
+        switch (columnDataTypes[i]) {
+          case INT:
+            dataTableBuilder.setColumn(i, (int) values[i]);
+            break;
+          case LONG:
+            dataTableBuilder.setColumn(i, (long) values[i]);
+            break;
+          case FLOAT:
+            dataTableBuilder.setColumn(i, (float) values[i]);
+            break;
+          case DOUBLE:
+            dataTableBuilder.setColumn(i, (double) values[i]);
+            break;
+          case STRING:
+            dataTableBuilder.setColumn(i, (String) values[i]);
+            break;
+          case BYTES:
+            dataTableBuilder.setColumn(i, (ByteArray) values[i]);
+            break;
+          // Add other distinct column type supports here
+          default:
+            throw new IllegalStateException();
+        }
+      }
+      dataTableBuilder.finishRow();
+    }
+    return dataTableBuilder.build().toBytes();
+  }
+
+  /*
+   * BROKER ONLY METHODS
+   */
+
+  /**
+   * Broker-side constructor to deserialize the DistinctTable from a {@link ByteBuffer}. The DistinctTable constructed
+   * this way cannot be used to add more records or merge other DistinctTables, but can only be used to be merged into
+   * the main DistinctTable because it does not contain the order-by information and limit.
+   */
+  public DistinctTable(ByteBuffer byteBuffer)
+      throws IOException {
+    DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
+    _dataSchema = dataTable.getDataSchema();
+    _limit = Integer.MIN_VALUE;
+    _uniqueRecords = null;
+    _sortedRecords = null;
+    int numRecords = dataTable.getNumberOfRows();
+    DataSchema.ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
+    int numColumns = columnDataTypes.length;
+    _records = new ArrayList<>(numRecords);
+    for (int i = 0; i < numRecords; i++) {
+      Object[] values = new Object[numColumns];
+      for (int j = 0; j < numColumns; j++) {
+        switch (columnDataTypes[j]) {
+          case INT:
+            values[j] = dataTable.getInt(i, j);
+            break;
+          case LONG:
+            values[j] = dataTable.getLong(i, j);
+            break;
+          case FLOAT:
+            values[j] = dataTable.getFloat(i, j);
+            break;
+          case DOUBLE:
+            values[j] = dataTable.getDouble(i, j);
+            break;
+          case STRING:
+            values[j] = dataTable.getString(i, j);
+            break;
+          case BYTES:
+            values[j] = dataTable.getBytes(i, j);
+            break;
+          // Add other distinct column type supports here
+          default:
+            throw new IllegalStateException();
+        }
+      }
+      _records.add(new Record(values));
+    }
+  }
+
+  /**
+   * Merges a deserialized DistinctTable into the main DistinctTable.
+   */
+  public void mergeDeserializedDistinctTable(DistinctTable distinctTable) {
+    mergeRecords(distinctTable._records);
+  }
+
+  /**
+   * Returns the final result (all unique records, sorted if ordering is required) from the main DistinctTable.
+   */
+  public Iterator<Record> getFinalResult() {
+    if (_sortedRecords != null) {
+      int numRecords = _sortedRecords.size();
+      LinkedList<Record> sortedRecords = new LinkedList<>();
+      for (int i = 0; i < numRecords; i++) {
+        sortedRecords.addFirst(_sortedRecords.poll());
+      }
+      return sortedRecords.iterator();
+    } else {
+      return _uniqueRecords.iterator();
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 9c5f222..6dd7879 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -18,11 +18,9 @@
  */
 package org.apache.pinot.core.query.reduce;
 
-import com.google.common.base.Preconditions;
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -35,9 +33,8 @@ import org.apache.pinot.common.response.broker.SelectionResults;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.query.aggregation.DistinctTable;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.core.util.QueryOptions;
@@ -48,14 +45,11 @@ import org.apache.pinot.core.util.QueryOptions;
  */
 public class DistinctDataTableReducer implements DataTableReducer {
   private final BrokerRequest _brokerRequest;
-  private final AggregationFunction _aggregationFunction;
   private final boolean _responseFormatSql;
 
   // TODO: queryOptions.isPreserveType() is ignored for DISTINCT queries.
-  DistinctDataTableReducer(BrokerRequest brokerRequest, AggregationFunction aggregationFunction,
-      QueryOptions queryOptions) {
+  DistinctDataTableReducer(BrokerRequest brokerRequest, QueryOptions queryOptions) {
     _brokerRequest = brokerRequest;
-    _aggregationFunction = aggregationFunction;
     _responseFormatSql = queryOptions.isResponseFormatSQL();
   }
 
@@ -68,8 +62,24 @@ public class DistinctDataTableReducer implements DataTableReducer {
   public void reduceAndSetResults(String tableName, DataSchema dataSchema,
       Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
       BrokerMetrics brokerMetrics) {
+    // DISTINCT is implemented as an aggregation function in the execution engine. Just like
+    // other aggregation functions, DISTINCT returns its result as a single object
+    // (of type DistinctTable) serialized by the server into the DataTable and deserialized
+    // by the broker from the DataTable. So there should be exactly 1 row and 1 column and that
+    // column value should be the serialized DistinctTable -- so essentially it is a DataTable
+    // inside a DataTable
 
-    if (dataTableMap.isEmpty()) {
+    // Gather all non-empty DistinctTables
+    List<DistinctTable> nonEmptyDistinctTables = new ArrayList<>(dataTableMap.size());
+    for (DataTable dataTable : dataTableMap.values()) {
+      DistinctTable distinctTable = dataTable.getObject(0, 0);
+      if (distinctTable.size() > 0) {
+        nonEmptyDistinctTables.add(distinctTable);
+      }
+    }
+
+    if (nonEmptyDistinctTables.isEmpty()) {
+      // All the DistinctTables are empty, construct an empty response
       if (_responseFormatSql) {
         // TODO: This returns schema with all STRING data types.
         //  There's no way currently to get the data types of the distinct columns for empty results
@@ -78,53 +88,26 @@ public class DistinctDataTableReducer implements DataTableReducer {
       } else {
         brokerResponseNative.setSelectionResults(new SelectionResults(getDistinctColumns(), Collections.emptyList()));
       }
-      return;
-    }
+    } else {
+      // Construct a main DistinctTable and merge all non-empty DistinctTables into it
+      DistinctTable mainDistinctTable =
+          new DistinctTable(nonEmptyDistinctTables.get(0).getDataSchema(), _brokerRequest.getOrderBy(),
+              _brokerRequest.getLimit());
+      for (DistinctTable distinctTable : nonEmptyDistinctTables) {
+        mainDistinctTable.mergeDeserializedDistinctTable(distinctTable);
+      }
 
-    assert dataSchema != null;
-    // DISTINCT is implemented as an aggregation function in the execution engine. Just like
-    // other aggregation functions, DISTINCT returns its result as a single object
-    // (of type DistinctTable) serialized by the server into the DataTable and deserialized
-    // by the broker from the DataTable. So there should be exactly 1 row and 1 column and that
-    // column value should be the serialized DistinctTable -- so essentially it is a DataTable
-    // inside a DataTable
-    Collection<DataTable> dataTables = dataTableMap.values();
-    Preconditions.checkState(dataSchema.size() == 1, "DataTable from server for DISTINCT should have exactly one row");
-    Preconditions.checkState(dataSchema.getColumnDataType(0) == DataSchema.ColumnDataType.OBJECT,
-        "DistinctAggregationFunction should return result of type OBJECT");
-    Object mergedIntermediateResult = null;
-    // go over all the data tables from servers
-    for (DataTable dataTable : dataTables) {
-      Preconditions.checkState(dataTable.getNumberOfRows() == 1);
-      // deserialize the DistinctTable
-      Object intermediateResultToMerge = dataTable.getObject(0, 0);
-      Preconditions.checkState(intermediateResultToMerge instanceof DistinctTable);
-      DistinctTable distinctTable = (DistinctTable) intermediateResultToMerge;
-      // since DistinctTable uses the Table interface and during deserialization, we didn't
-      // have all the necessary information w.r.t ORDER BY, limit etc, we set it now
-      // before merging so that resize/trimming/sorting happens correctly
-      distinctTable.addLimitAndOrderByInfo(_brokerRequest);
-      if (mergedIntermediateResult == null) {
-        mergedIntermediateResult = intermediateResultToMerge;
+      // Up until now, we have treated DISTINCT similar to another aggregation function even in terms
+      // of the result from function and merging results.
+      // However, the DISTINCT query is just another SELECTION style query from the user's point
+      // of view and will return one or records in the result table for the column(s) selected and so
+      // for that reason, response from broker should be a selection query result.
+      if (_responseFormatSql) {
+        brokerResponseNative.setResultTable(reduceToResultTable(mainDistinctTable));
       } else {
-        mergedIntermediateResult = _aggregationFunction.merge(mergedIntermediateResult, intermediateResultToMerge);
+        brokerResponseNative.setSelectionResults(reduceToSelectionResult(mainDistinctTable));
       }
     }
-
-    DistinctTable distinctTable = (DistinctTable) mergedIntermediateResult;
-    // finish the merging, sort (if ORDER BY), get iterator
-    distinctTable.finish(true);
-
-    // Up until now, we have treated DISTINCT similar to another aggregation function even in terms
-    // of the result from function and merging results.
-    // However, the DISTINCT query is just another SELECTION style query from the user's point
-    // of view and will return one or records in the result table for the column(s) selected and so
-    // for that reason, response from broker should be a selection query result.
-    if (_responseFormatSql) {
-      brokerResponseNative.setResultTable(reduceToResultTable(distinctTable));
-    } else {
-      brokerResponseNative.setSelectionResults(reduceToSelectionResult(distinctTable));
-    }
   }
 
   private SelectionResults reduceToSelectionResult(DistinctTable distinctTable) {
@@ -132,7 +115,7 @@ public class DistinctDataTableReducer implements DataTableReducer {
     DataSchema dataSchema = distinctTable.getDataSchema();
     DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
-    Iterator<Record> iterator = distinctTable.iterator();
+    Iterator<Record> iterator = distinctTable.getFinalResult();
     while (iterator.hasNext()) {
       Object[] values = iterator.next().getValues();
       Serializable[] row = new Serializable[numColumns];
@@ -149,7 +132,7 @@ public class DistinctDataTableReducer implements DataTableReducer {
     DataSchema dataSchema = distinctTable.getDataSchema();
     DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
-    Iterator<Record> iterator = distinctTable.iterator();
+    Iterator<Record> iterator = distinctTable.getFinalResult();
     while (iterator.hasNext()) {
       Object[] values = iterator.next().getValues();
       Object[] row = new Object[numColumns];
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index c19a673..cb1f124 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -46,7 +46,7 @@ public final class ResultReducerFactory {
         // Aggregation only query
         if (aggregationFunctions.length == 1 && aggregationFunctions[0].getType() == AggregationFunctionType.DISTINCT) {
           // Distinct query
-          dataTableReducer = new DistinctDataTableReducer(brokerRequest, aggregationFunctions[0], queryOptions);
+          dataTableReducer = new DistinctDataTableReducer(brokerRequest, queryOptions);
         } else {
           dataTableReducer = new AggregationDataTableReducer(brokerRequest, aggregationFunctions, queryOptions);
         }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
index 61607c7..22acc91 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/table/TableResizerTest.java
@@ -23,10 +23,8 @@ import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import org.apache.pinot.common.request.SelectionSort;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
@@ -427,160 +425,4 @@ public class TableResizerTest {
           .assertEquals(intermediateRecord._values[0], distinctCountFunction.extractFinalResult(record.getValues()[5]));
     }
   }
-
-  @Test
-  public void testResizerForSetBasedTable() {
-    String[] columns = new String[]{"STRING_COL"};
-    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.STRING};
-
-    DataSchema schema = new DataSchema(columns, columnDataTypes);
-    SelectionSort selectionSort = new SelectionSort();
-    selectionSort.setColumn("STRING_COL");
-    selectionSort.setIsAsc(true);
-
-    TableResizer tableResizer = new TableResizer(schema, new AggregationFunction[0], Lists.newArrayList(selectionSort));
-    Set<Record> uniqueRecordsSet = new HashSet<>();
-
-    Record r1 = new Record(new Object[]{"B"});
-    Record r2 = new Record(new Object[]{"A"});
-    Record r3 = new Record(new Object[]{"D"});
-    Record r4 = new Record(new Object[]{"C"});
-    Record r5 = new Record(new Object[]{"E"});
-
-    uniqueRecordsSet.add(r1);
-    uniqueRecordsSet.add(r2);
-    uniqueRecordsSet.add(r3);
-    uniqueRecordsSet.add(r4);
-    uniqueRecordsSet.add(r5);
-
-    int trimSize = 5;
-    // no records should have been evicted
-    Set<Record> copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertTrue(copiedRecordSet.contains(r1));
-    Assert.assertTrue(copiedRecordSet.contains(r2));
-    Assert.assertTrue(copiedRecordSet.contains(r3));
-    Assert.assertTrue(copiedRecordSet.contains(r4));
-    Assert.assertTrue(copiedRecordSet.contains(r5));
-
-    List<Record> sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(trimSize, sorted.size());
-    Assert.assertEquals(r2, sorted.get(0));
-    Assert.assertEquals(r1, sorted.get(1));
-    Assert.assertEquals(r4, sorted.get(2));
-    Assert.assertEquals(r3, sorted.get(3));
-    Assert.assertEquals(r5, sorted.get(4));
-
-    trimSize = 3;
-    // D and E should have been evicted through PQ
-    copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertTrue(copiedRecordSet.contains(r1));
-    Assert.assertTrue(copiedRecordSet.contains(r2));
-    Assert.assertTrue(copiedRecordSet.contains(r4));
-    Assert.assertFalse(copiedRecordSet.contains(r3));
-    Assert.assertFalse(copiedRecordSet.contains(r5));
-
-    sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(trimSize, sorted.size());
-    Assert.assertEquals(r2, sorted.get(0));
-    Assert.assertEquals(r1, sorted.get(1));
-    Assert.assertEquals(r4, sorted.get(2));
-
-    trimSize = 2;
-    // A and B should have been retained through PQ
-    copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertTrue(copiedRecordSet.contains(r1));
-    Assert.assertTrue(copiedRecordSet.contains(r2));
-    Assert.assertFalse(copiedRecordSet.contains(r3));
-    Assert.assertFalse(copiedRecordSet.contains(r4));
-    Assert.assertFalse(copiedRecordSet.contains(r5));
-
-    sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(2, sorted.size());
-    Assert.assertEquals(r2, sorted.get(0));
-    Assert.assertEquals(r1, sorted.get(1));
-
-    trimSize = 1;
-    // A should have been retained through PQ
-    copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertFalse(copiedRecordSet.contains(r1));
-    Assert.assertTrue(copiedRecordSet.contains(r2));
-    Assert.assertFalse(copiedRecordSet.contains(r3));
-    Assert.assertFalse(copiedRecordSet.contains(r4));
-    Assert.assertFalse(copiedRecordSet.contains(r5));
-
-    sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(1, sorted.size());
-    Assert.assertEquals(r2, sorted.get(0));
-
-    // change the order to DESC
-    selectionSort.setIsAsc(false);
-    tableResizer = new TableResizer(schema, new AggregationFunction[0], Lists.newArrayList(selectionSort));
-
-    trimSize = 5;
-    // no records should have been evicted
-    copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertTrue(copiedRecordSet.contains(r1));
-    Assert.assertTrue(copiedRecordSet.contains(r2));
-    Assert.assertTrue(copiedRecordSet.contains(r3));
-    Assert.assertTrue(copiedRecordSet.contains(r4));
-    Assert.assertTrue(copiedRecordSet.contains(r5));
-
-    sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(trimSize, sorted.size());
-    Assert.assertEquals(r5, sorted.get(0));
-    Assert.assertEquals(r3, sorted.get(1));
-    Assert.assertEquals(r4, sorted.get(2));
-    Assert.assertEquals(r1, sorted.get(3));
-    Assert.assertEquals(r2, sorted.get(4));
-
-    trimSize = 3;
-    // A and B should have been evicted through PQ
-    copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertFalse(copiedRecordSet.contains(r1));
-    Assert.assertFalse(copiedRecordSet.contains(r2));
-    Assert.assertTrue(copiedRecordSet.contains(r4));
-    Assert.assertTrue(copiedRecordSet.contains(r3));
-    Assert.assertTrue(copiedRecordSet.contains(r5));
-
-    sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(3, sorted.size());
-    Assert.assertEquals(r5, sorted.get(0));
-    Assert.assertEquals(r3, sorted.get(1));
-    Assert.assertEquals(r4, sorted.get(2));
-
-    trimSize = 2;
-    // D and E should have been retained through PQ
-    copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertFalse(copiedRecordSet.contains(r1));
-    Assert.assertFalse(copiedRecordSet.contains(r2));
-    Assert.assertFalse(copiedRecordSet.contains(r4));
-    Assert.assertTrue(copiedRecordSet.contains(r3));
-    Assert.assertTrue(copiedRecordSet.contains(r5));
-
-    sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(2, sorted.size());
-    Assert.assertEquals(r5, sorted.get(0));
-    Assert.assertEquals(r3, sorted.get(1));
-
-    trimSize = 1;
-    // E should have been retained through PQ
-    copiedRecordSet = new HashSet<>(uniqueRecordsSet);
-    tableResizer.resizeRecordsSet(copiedRecordSet, trimSize);
-    Assert.assertFalse(copiedRecordSet.contains(r1));
-    Assert.assertFalse(copiedRecordSet.contains(r2));
-    Assert.assertFalse(copiedRecordSet.contains(r4));
-    Assert.assertFalse(copiedRecordSet.contains(r3));
-    Assert.assertTrue(copiedRecordSet.contains(r5));
-
-    sorted = tableResizer.resizeAndSortRecordSet(copiedRecordSet, trimSize);
-    Assert.assertEquals(1, sorted.size());
-    Assert.assertEquals(r5, sorted.get(0));
-  }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index 6417788..85b39ba 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -49,7 +49,7 @@ import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.query.aggregation.DistinctTable;
+import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -91,7 +91,8 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
       .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
       .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES).build();
-  private static final TableConfig TABLE = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+  private static final TableConfig TABLE =
+      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
 
   private IndexSegment _indexSegment;
   private List<SegmentDataManager> _segmentDataManagers;
@@ -202,7 +203,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
           expectedValues.add(i);
         }
         Set<Integer> actualValues = new HashSet<>();
-        Iterator<Record> iterator = distinctTable.iterator();
+        Iterator<Record> iterator = distinctTable.getFinalResult();
         while (iterator.hasNext()) {
           Record record = iterator.next();
           Object[] values = record.getValues();
@@ -235,7 +236,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
           expectedValues.add(i);
         }
         Set<Integer> actualValues = new HashSet<>();
-        Iterator<Record> iterator = distinctTable.iterator();
+        Iterator<Record> iterator = distinctTable.getFinalResult();
         while (iterator.hasNext()) {
           Record record = iterator.next();
           Object[] values = record.getValues();
@@ -256,24 +257,18 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn", "bytesColumn"});
         assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.BYTES});
 
-        // Check values, where all 100 unique values should be returned (limit won't take effect on server side)
-        // TODO: After optimizing the DistinctTable (only keep the limit number of records), only 5 values should be
-        //       returned
-        assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT);
-        Set<Integer> expectedValues = new HashSet<>();
-        for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
-          expectedValues.add(i);
-        }
-        Set<Integer> actualValues = new HashSet<>();
-        Iterator<Record> iterator = distinctTable.iterator();
-        while (iterator.hasNext()) {
-          Record record = iterator.next();
-          Object[] values = record.getValues();
+        // Check values, where only 5 top values sorted in ByteArray format ascending order should be returned
+        assertEquals(distinctTable.size(), 5);
+        // ByteArray of "30", "31", "3130", "3131", "3132" (same as String order because all digits can be encoded with
+        // a single byte)
+        int[] expectedValues = new int[]{0, 1, 10, 11, 12};
+        Iterator<Record> iterator = distinctTable.getFinalResult();
+        for (int i = 0; i < 5; i++) {
+          Object[] values = iterator.next().getValues();
           int intValue = (int) values[0];
-          assertEquals(StringUtil.decodeUtf8(((ByteArray) values[1]).getBytes()), Integer.toString(intValue));
-          actualValues.add(intValue);
+          assertEquals(intValue, expectedValues[i]);
+          assertEquals(Integer.parseInt(StringUtil.decodeUtf8(((ByteArray) values[1]).getBytes())), intValue);
         }
-        assertEquals(actualValues, expectedValues);
       }
       {
         // Test selecting some columns with transform, filter, order-by and limit. Spaces in 'add' are intentional
@@ -288,24 +283,16 @@ public class DistinctQueriesTest extends BaseQueriesTest {
         assertEquals(dataSchema.getColumnDataTypes(),
             new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
 
-        // Check values, where 60 matched values should be returned (limit won't take effect on server side)
-        // TODO: After optimizing the DistinctTable (only keep the limit number of records), only 10 values should be
-        //       returned
-        assertEquals(distinctTable.size(), 60);
-        Set<Integer> expectedValues = new HashSet<>();
-        for (int i = 0; i < 60; i++) {
-          expectedValues.add(i);
-        }
-        Set<Integer> actualValues = new HashSet<>();
-        Iterator<Record> iterator = distinctTable.iterator();
-        while (iterator.hasNext()) {
-          Record record = iterator.next();
-          Object[] values = record.getValues();
+        // Check values, where only 10 top values sorted in string format descending order should be returned
+        assertEquals(distinctTable.size(), 10);
+        int[] expectedValues = new int[]{9, 8, 7, 6, 59, 58, 57, 56, 55, 54};
+        Iterator<Record> iterator = distinctTable.getFinalResult();
+        for (int i = 0; i < 10; i++) {
+          Object[] values = iterator.next().getValues();
           int intValue = ((Double) values[0]).intValue() / 2;
+          assertEquals(intValue, expectedValues[i]);
           assertEquals(Integer.parseInt((String) values[1]), intValue);
-          actualValues.add(intValue);
         }
-        assertEquals(actualValues, expectedValues);
       }
       {
         // Test selecting some columns with filter that does not match any record
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index 133569e..27dde46 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -25,7 +25,7 @@ import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.query.aggregation.DistinctTable;
+import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
@@ -186,10 +186,10 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
 
     DataSchema dataSchema = distinctTable.getDataSchema();
     Assert.assertEquals(dataSchema.getColumnNames(), new String[]{"column1"});
-    Assert.assertEquals(dataSchema.getColumnDataTypes(),
-        new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    Assert
+        .assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
 
-    Iterator<Record> iterator = distinctTable.iterator();
+    Iterator<Record> iterator = distinctTable.getFinalResult();
     while (iterator.hasNext()) {
       Record record = iterator.next();
       Assert.assertNotNull(record);
@@ -221,7 +221,7 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
     Assert.assertEquals(dataSchema.getColumnDataTypes(),
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
 
-    Iterator<Record> iterator = distinctTable.iterator();
+    Iterator<Record> iterator = distinctTable.getFinalResult();
     while (iterator.hasNext()) {
       Record record = iterator.next();
       Assert.assertNotNull(record);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org