You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/12/02 18:37:25 UTC

[incubator-pinot] branch master updated: Improve performance for distinct queries (#6285)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 66e9394  Improve performance for distinct queries (#6285)
66e9394 is described below

commit 66e9394e6d0dc3711ca8bd94d3297e49efba5d5d
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Dec 2 10:37:09 2020 -0800

    Improve performance for distinct queries (#6285)
    
    Currently the distinct queries are handled as aggregation, which has limitations on early termination and special type handling for better performance.
    
    This PR adds new operators for distinct queries.
    The new operators:
    - Early terminate the distinct only queries when enough distinct records are collected
    - Specialize the handling for different data types, using primitives if possible
    - Specialize the handling for single column distinct queries
    - Inner segment, the operator will calculate the distinct records using dictionary ids if possible, and only read the dictionary when returning the results
---
 .../apache/pinot/core/common/ObjectSerDeUtils.java |    6 +-
 .../operator/combine/DistinctCombineOperator.java  |   84 ++
 .../core/operator/query/DistinctOperator.java      |   84 ++
 .../apache/pinot/core/plan/CombinePlanNode.java    |    6 +-
 .../apache/pinot/core/plan/DistinctPlanNode.java   |   52 +
 .../core/plan/maker/InstancePlanMakerImplV2.java   |   12 +-
 .../function/DistinctAggregationFunction.java      |  117 +--
 .../DistinctExecutor.java}                         |   26 +-
 .../query/distinct/DistinctExecutorFactory.java    |  190 ++++
 .../customobject => distinct}/DistinctTable.java   |  254 +++--
 ...DictionaryBasedMultiColumnDistinctExecutor.java |   98 ++
 ...ictionaryBasedSingleColumnDistinctExecutor.java |   72 ++
 ...ionaryBasedMultiColumnDistinctOnlyExecutor.java |   59 ++
 ...aryBasedMultiColumnDistinctOrderByExecutor.java |   95 ++
 ...onaryBasedSingleColumnDistinctOnlyExecutor.java |   51 +
 ...ryBasedSingleColumnDistinctOrderByExecutor.java |   71 ++
 .../BaseRawBytesSingleColumnDistinctExecutor.java  |   60 ++
 .../BaseRawDoubleSingleColumnDistinctExecutor.java |   61 ++
 .../BaseRawFloatSingleColumnDistinctExecutor.java  |   61 ++
 .../BaseRawIntSingleColumnDistinctExecutor.java    |   61 ++
 .../BaseRawLongSingleColumnDistinctExecutor.java   |   61 ++
 .../BaseRawStringSingleColumnDistinctExecutor.java |   59 ++
 .../RawBytesSingleColumnDistinctOnlyExecutor.java  |   50 +
 ...awBytesSingleColumnDistinctOrderByExecutor.java |   71 ++
 .../RawDoubleSingleColumnDistinctOnlyExecutor.java |   49 +
 ...wDoubleSingleColumnDistinctOrderByExecutor.java |   70 ++
 .../RawFloatSingleColumnDistinctOnlyExecutor.java  |   49 +
 ...awFloatSingleColumnDistinctOrderByExecutor.java |   70 ++
 .../RawIntSingleColumnDistinctOnlyExecutor.java    |   49 +
 .../RawIntSingleColumnDistinctOrderByExecutor.java |   70 ++
 .../RawLongSingleColumnDistinctOnlyExecutor.java   |   49 +
 ...RawLongSingleColumnDistinctOrderByExecutor.java |   70 ++
 .../raw/RawMultiColumnDistinctExecutor.java        |   86 ++
 .../RawStringSingleColumnDistinctOnlyExecutor.java |   49 +
 ...wStringSingleColumnDistinctOrderByExecutor.java |   70 ++
 .../query/pruner/SelectionQuerySegmentPruner.java  |    4 +-
 .../query/reduce/DistinctDataTableReducer.java     |    4 +-
 .../request/context/utils/QueryContextUtils.java   |   23 +-
 .../BrokerRequestToQueryContextConverterTest.java  |   30 +-
 .../apache/pinot/queries/DistinctQueriesTest.java  | 1089 ++++++++++++--------
 ...erSegmentAggregationSingleValueQueriesTest.java |   48 +-
 41 files changed, 2922 insertions(+), 718 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 0367f29..a90ef12 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -54,9 +54,9 @@ import org.apache.pinot.common.function.scalar.DataTypeConversionFunctions;
 import org.apache.pinot.common.utils.StringUtil;
 import org.apache.pinot.core.geospatial.serde.GeometrySerializer;
 import org.apache.pinot.core.query.aggregation.function.customobject.AvgPair;
-import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.aggregation.function.customobject.MinMaxRangePair;
 import org.apache.pinot.core.query.aggregation.function.customobject.QuantileDigest;
+import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
 import org.apache.pinot.spi.utils.ByteArray;
@@ -356,7 +356,7 @@ public class ObjectSerDeUtils {
     @Override
     public DistinctTable deserialize(byte[] bytes) {
       try {
-        return new DistinctTable(ByteBuffer.wrap(bytes));
+        return DistinctTable.fromByteBuffer(ByteBuffer.wrap(bytes));
       } catch (IOException e) {
         throw new IllegalStateException("Caught exception while de-serializing DistinctTable", e);
       }
@@ -365,7 +365,7 @@ public class ObjectSerDeUtils {
     @Override
     public DistinctTable deserialize(ByteBuffer byteBuffer) {
       try {
-        return new DistinctTable(byteBuffer);
+        return DistinctTable.fromByteBuffer(byteBuffer);
       } catch (IOException e) {
         throw new IllegalStateException("Caught exception while de-serializing DistinctTable", e);
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java
new file mode 100644
index 0000000..e78ab40
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/DistinctCombineOperator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.combine;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Combine operator for distinct queries.
+ */
+@SuppressWarnings("rawtypes")
+public class DistinctCombineOperator extends BaseCombineOperator {
+  private static final String OPERATOR_NAME = "DistinctCombineOperator";
+
+  private final boolean _hasOrderBy;
+
+  public DistinctCombineOperator(List<Operator> operators, QueryContext queryContext, ExecutorService executorService,
+      long endTimeMs) {
+    super(operators, queryContext, executorService, endTimeMs);
+    _hasOrderBy = queryContext.getOrderByExpressions() != null;
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  protected boolean isQuerySatisfied(IntermediateResultsBlock resultsBlock) {
+    if (_hasOrderBy) {
+      return false;
+    }
+    List<Object> result = resultsBlock.getAggregationResult();
+    assert result != null && result.size() == 1 && result.get(0) instanceof DistinctTable;
+    DistinctTable distinctTable = (DistinctTable) result.get(0);
+    return distinctTable.size() >= _queryContext.getLimit();
+  }
+
+  @Override
+  protected void mergeResultsBlocks(IntermediateResultsBlock mergedBlock, IntermediateResultsBlock blockToMerge) {
+    // TODO: Use a separate way to represent DISTINCT instead of aggregation.
+    List<Object> mergedResults = mergedBlock.getAggregationResult();
+    assert mergedResults != null && mergedResults.size() == 1 && mergedResults.get(0) instanceof DistinctTable;
+    DistinctTable mergedDistinctTable = (DistinctTable) mergedResults.get(0);
+
+    List<Object> resultsToMerge = blockToMerge.getAggregationResult();
+    assert resultsToMerge != null && resultsToMerge.size() == 1 && resultsToMerge.get(0) instanceof DistinctTable;
+    DistinctTable distinctTableToMerge = (DistinctTable) resultsToMerge.get(0);
+
+    // Convert the merged table into a main table if necessary in order to merge other tables
+    if (!mergedDistinctTable.isMainTable()) {
+      DistinctTable mainDistinctTable =
+          new DistinctTable(distinctTableToMerge.getDataSchema(), _queryContext.getOrderByExpressions(),
+              _queryContext.getLimit());
+      mainDistinctTable.mergeTable(mergedDistinctTable);
+      mergedBlock.setAggregationResults(Collections.singletonList(mainDistinctTable));
+      mergedDistinctTable = mainDistinctTable;
+    }
+
+    mergedDistinctTable.mergeTable(distinctTableToMerge);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
new file mode 100644
index 0000000..4ea1bce
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/query/DistinctOperator.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.query;
+
+import java.util.Collections;
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.BaseOperator;
+import org.apache.pinot.core.operator.ExecutionStatistics;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctExecutorFactory;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+
+
+/**
+ * Operator for distinct queries on a single segment.
+ */
+public class DistinctOperator extends BaseOperator<IntermediateResultsBlock> {
+  private static final String OPERATOR_NAME = "DistinctOperator";
+
+  private final IndexSegment _indexSegment;
+  private final DistinctAggregationFunction _distinctAggregationFunction;
+  private final TransformOperator _transformOperator;
+  private final DistinctExecutor _distinctExecutor;
+
+  private int _numDocsScanned = 0;
+
+  public DistinctOperator(IndexSegment indexSegment, DistinctAggregationFunction distinctAggregationFunction,
+      TransformOperator transformOperator) {
+    _indexSegment = indexSegment;
+    _distinctAggregationFunction = distinctAggregationFunction;
+    _transformOperator = transformOperator;
+    _distinctExecutor = DistinctExecutorFactory.getDistinctExecutor(distinctAggregationFunction, transformOperator);
+  }
+
+  @Override
+  protected IntermediateResultsBlock getNextBlock() {
+    TransformBlock transformBlock;
+    while ((transformBlock = _transformOperator.nextBlock()) != null) {
+      _numDocsScanned += transformBlock.getNumDocs();
+      if (_distinctExecutor.process(transformBlock)) {
+        break;
+      }
+    }
+    DistinctTable distinctTable = _distinctExecutor.getResult();
+    // TODO: Use a separate way to represent DISTINCT instead of aggregation.
+    return new IntermediateResultsBlock(new AggregationFunction[]{_distinctAggregationFunction},
+        Collections.singletonList(distinctTable), false);
+  }
+
+  @Override
+  public String getOperatorName() {
+    return OPERATOR_NAME;
+  }
+
+  @Override
+  public ExecutionStatistics getExecutionStatistics() {
+    long numEntriesScannedInFilter = _transformOperator.getExecutionStatistics().getNumEntriesScannedInFilter();
+    long numEntriesScannedPostFilter = (long) _numDocsScanned * _transformOperator.getNumColumnsProjected();
+    int numTotalDocs = _indexSegment.getSegmentMetadata().getTotalDocs();
+    return new ExecutionStatistics(_numDocsScanned, numEntriesScannedInFilter, numEntriesScannedPostFilter,
+        numTotalDocs);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
index fb634c2..c570e96 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
@@ -31,6 +31,7 @@ import org.apache.pinot.common.proto.Server;
 import org.apache.pinot.core.common.Operator;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.combine.AggregationOnlyCombineOperator;
+import org.apache.pinot.core.operator.combine.DistinctCombineOperator;
 import org.apache.pinot.core.operator.combine.GroupByCombineOperator;
 import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
 import org.apache.pinot.core.operator.combine.SelectionOnlyCombineOperator;
@@ -185,7 +186,7 @@ public class CombinePlanNode implements PlanNode {
         }
         return new GroupByCombineOperator(operators, _queryContext, _executorService, _endTimeMs, _numGroupsLimit);
       }
-    } else {
+    } else if (QueryContextUtils.isSelectionQuery(_queryContext)) {
       if (_queryContext.getLimit() == 0 || _queryContext.getOrderByExpressions() == null) {
         // Selection only
         return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
@@ -193,6 +194,9 @@ public class CombinePlanNode implements PlanNode {
         // Selection order-by
         return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
       }
+    } else {
+      assert QueryContextUtils.isDistinctQuery(_queryContext);
+      return new DistinctCombineOperator(operators, _queryContext, _executorService, _endTimeMs);
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java
new file mode 100644
index 0000000..4204f92
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/DistinctPlanNode.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import org.apache.pinot.core.indexsegment.IndexSegment;
+import org.apache.pinot.core.operator.query.DistinctOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Execution plan for distinct queries on a single segment.
+ */
+@SuppressWarnings("rawtypes")
+public class DistinctPlanNode implements PlanNode {
+  private final IndexSegment _indexSegment;
+  private final DistinctAggregationFunction _distinctAggregationFunction;
+  private final TransformPlanNode _transformPlanNode;
+
+  public DistinctPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
+    _indexSegment = indexSegment;
+    AggregationFunction[] aggregationFunctions = queryContext.getAggregationFunctions();
+    assert aggregationFunctions != null && aggregationFunctions.length == 1
+        && aggregationFunctions[0] instanceof DistinctAggregationFunction;
+    _distinctAggregationFunction = (DistinctAggregationFunction) aggregationFunctions[0];
+    _transformPlanNode =
+        new TransformPlanNode(_indexSegment, queryContext, _distinctAggregationFunction.getInputExpressions(),
+            DocIdSetPlanNode.MAX_DOC_PER_CALL);
+  }
+
+  @Override
+  public DistinctOperator run() {
+    return new DistinctOperator(_indexSegment, _distinctAggregationFunction, _transformPlanNode.run());
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
index 1794015..0ea8f58 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/plan/maker/InstancePlanMakerImplV2.java
@@ -32,6 +32,7 @@ import org.apache.pinot.core.plan.AggregationGroupByPlanNode;
 import org.apache.pinot.core.plan.AggregationPlanNode;
 import org.apache.pinot.core.plan.CombinePlanNode;
 import org.apache.pinot.core.plan.DictionaryBasedAggregationPlanNode;
+import org.apache.pinot.core.plan.DistinctPlanNode;
 import org.apache.pinot.core.plan.GlobalPlanImplV0;
 import org.apache.pinot.core.plan.InstanceResponsePlanNode;
 import org.apache.pinot.core.plan.MetadataBasedAggregationPlanNode;
@@ -124,7 +125,6 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
   @Override
   public PlanNode makeSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
     if (QueryContextUtils.isAggregationQuery(queryContext)) {
-      // Aggregation query
       List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions();
       if (groupByExpressions != null) {
         // Aggregation group-by query
@@ -150,9 +150,11 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
         }
         return new AggregationPlanNode(indexSegment, queryContext);
       }
-    } else {
-      // Selection query
+    } else if (QueryContextUtils.isSelectionQuery(queryContext)) {
       return new SelectionPlanNode(indexSegment, queryContext);
+    } else {
+      assert QueryContextUtils.isDistinctQuery(queryContext);
+      return new DistinctPlanNode(indexSegment, queryContext);
     }
   }
 
@@ -171,8 +173,8 @@ public class InstancePlanMakerImplV2 implements PlanMaker {
 
   @Override
   public PlanNode makeStreamingSegmentPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
-    if (QueryContextUtils.isAggregationQuery(queryContext)) {
-      throw new UnsupportedOperationException("Queries with aggregations are not supported");
+    if (!QueryContextUtils.isSelectionQuery(queryContext)) {
+      throw new UnsupportedOperationException("Only selection queries are supported");
     } else {
       // Selection query
       return new StreamingSelectionPlanNode(indexSegment, queryContext);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
index 72e7edc..359f75f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/DistinctAggregationFunction.java
@@ -18,31 +18,25 @@
  */
 package org.apache.pinot.core.query.aggregation.function;
 
-import com.google.common.base.Preconditions;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.function.AggregationFunctionType;
-import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.common.BlockValSet;
-import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
-import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
-import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
 import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
 import org.apache.pinot.core.query.request.context.ExpressionContext;
 import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
 
 
 /**
- * The DISTINCT clause in SQL is executed as the DISTINCT aggregation function.
- * TODO: Support group-by
+ * The DISTINCT clause in SQL is represented as the DISTINCT aggregation function. Currently it is only used to wrap the
+ * information for the distinct queries.
+ * TODO: Use a separate way to represent DISTINCT instead of aggregation.
  */
 @SuppressWarnings("rawtypes")
-public class DistinctAggregationFunction implements AggregationFunction<DistinctTable, Comparable> {
+public class DistinctAggregationFunction implements AggregationFunction<Object, Comparable> {
   private final List<ExpressionContext> _expressions;
   private final String[] _columns;
   private final List<OrderByExpressionContext> _orderByExpressions;
@@ -101,126 +95,65 @@ public class DistinctAggregationFunction implements AggregationFunction<Distinct
   }
 
   @Override
-  public AggregationResultHolder createAggregationResultHolder() {
-    return new ObjectAggregationResultHolder();
+  public ColumnDataType getIntermediateResultColumnType() {
+    return ColumnDataType.OBJECT;
   }
 
   @Override
-  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
-    int numBlockValSets = blockValSetMap.size();
-    int numExpressions = _expressions.size();
-    Preconditions
-        .checkState(numBlockValSets == numExpressions, "Size mismatch: numBlockValSets = %s, numExpressions = %s",
-            numBlockValSets, numExpressions);
-
-    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
-    for (int i = 0; i < numExpressions; i++) {
-      blockValSets[i] = blockValSetMap.get(_expressions.get(i));
-    }
-
-    DistinctTable distinctTable = aggregationResultHolder.getResult();
-    if (distinctTable == null) {
-      ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
-      for (int i = 0; i < numExpressions; i++) {
-        columnDataTypes[i] = ColumnDataType.fromDataTypeSV(blockValSetMap.get(_expressions.get(i)).getValueType());
-      }
-      DataSchema dataSchema = new DataSchema(_columns, columnDataTypes);
-      distinctTable = new DistinctTable(dataSchema, _orderByExpressions, _limit);
-      aggregationResultHolder.setValue(distinctTable);
-    }
-
-    // TODO: Follow up PR will make few changes to start using DictionaryBasedAggregationOperator for DISTINCT queries
-    //       without filter.
-
-    if (distinctTable.hasOrderBy()) {
-      // With order-by, no need to check whether the DistinctTable is already satisfied
-      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
-      for (int i = 0; i < length; i++) {
-        distinctTable.addWithOrderBy(new Record(blockValueFetcher.getRow(i)));
-      }
-    } else {
-      // Without order-by, early-terminate when the DistinctTable is already satisfied
-      if (distinctTable.isSatisfied()) {
-        return;
-      }
-      RowBasedBlockValueFetcher blockValueFetcher = new RowBasedBlockValueFetcher(blockValSets);
-      for (int i = 0; i < length; i++) {
-        if (distinctTable.addWithoutOrderBy(new Record(blockValueFetcher.getRow(i)))) {
-          return;
-        }
-      }
-    }
+  public AggregationResultHolder createAggregationResultHolder() {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public DistinctTable extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
-    DistinctTable distinctTable = aggregationResultHolder.getResult();
-    if (distinctTable != null) {
-      return distinctTable;
-    } else {
-      ColumnDataType[] columnDataTypes = new ColumnDataType[_columns.length];
-      // NOTE: Use STRING for unknown type
-      Arrays.fill(columnDataTypes, ColumnDataType.STRING);
-      return new DistinctTable(new DataSchema(_columns, columnDataTypes), _orderByExpressions, _limit);
-    }
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
-  /**
-   * NOTE: This method only handles merging of 2 main DistinctTables. It should not be used on Broker-side because it
-   *       does not support merging deserialized DistinctTables.
-   * <p>{@inheritDoc}
-   */
   @Override
-  public DistinctTable merge(DistinctTable intermediateResult1, DistinctTable intermediateResult2) {
-    if (intermediateResult1.size() == 0) {
-      return intermediateResult2;
-    }
-    if (intermediateResult2.size() != 0) {
-      intermediateResult1.mergeMainDistinctTable(intermediateResult2);
-    }
-    return intermediateResult1;
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public boolean isIntermediateResultComparable() {
-    return false;
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public ColumnDataType getIntermediateResultColumnType() {
-    return ColumnDataType.OBJECT;
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public ColumnDataType getFinalResultColumnType() {
+  public Object extractAggregationResult(AggregationResultHolder aggregationResultHolder) {
     throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+  public Object extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
     throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  public Object merge(Object intermediateResult1, Object intermediateResult2) {
     throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
-      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+  public boolean isIntermediateResultComparable() {
     throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public DistinctTable extractGroupByResult(GroupByResultHolder groupByResultHolder, int groupKey) {
+  public ColumnDataType getFinalResultColumnType() {
     throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 
   @Override
-  public Comparable extractFinalResult(DistinctTable intermediateResult) {
+  public Comparable extractFinalResult(Object intermediateResult) {
     throw new UnsupportedOperationException("Operation not supported for DISTINCT aggregation function");
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java
similarity index 51%
copy from pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
copy to pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java
index f3ff7ec..1a51e11 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutor.java
@@ -16,19 +16,27 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.request.context.utils;
+package org.apache.pinot.core.query.distinct;
 
-import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
 
 
-public class QueryContextUtils {
-  private QueryContextUtils() {
-  }
+/**
+ * Interface class for executing the distinct queries.
+ */
+public interface DistinctExecutor {
+  // TODO: Tune the initial capacity
+  int MAX_INITIAL_CAPACITY = 10000;
+
+  /**
+   * Processes the given transform block, returns {@code true} if the query is already satisfied, {@code false}
+   * otherwise. No more calls should be made after it returns {@code true}.
+   */
+  boolean process(TransformBlock transformBlock);
 
   /**
-   * Returns {@code true} if the given query is an aggregation query, {@code false} otherwise.
+   * Returns the distinct result. Note that the returned DistinctTable might not be a main DistinctTable, thus cannot be
+   * used to merge other records or tables, but can only be merged into the main DistinctTable.
    */
-  public static boolean isAggregationQuery(QueryContext query) {
-    return query.getAggregationFunctions() != null;
-  }
+  DistinctTable getResult();
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutorFactory.java
new file mode 100644
index 0000000..af3a9e8
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctExecutorFactory.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
+import org.apache.pinot.core.query.distinct.dictionary.DictionaryBasedMultiColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.dictionary.DictionaryBasedMultiColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.distinct.dictionary.DictionaryBasedSingleColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.dictionary.DictionaryBasedSingleColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawBytesSingleColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawBytesSingleColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawDoubleSingleColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawDoubleSingleColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawFloatSingleColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawFloatSingleColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawIntSingleColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawIntSingleColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawLongSingleColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawLongSingleColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawMultiColumnDistinctExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawStringSingleColumnDistinctOnlyExecutor;
+import org.apache.pinot.core.query.distinct.raw.RawStringSingleColumnDistinctOrderByExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Factory for {@link DistinctExecutor}.
+ */
+public class DistinctExecutorFactory {
+  private DistinctExecutorFactory() {
+  }
+
+  /**
+   * Returns the {@link DistinctExecutor} for the given distinct query.
+   */
+  public static DistinctExecutor getDistinctExecutor(DistinctAggregationFunction distinctAggregationFunction,
+      TransformOperator transformOperator) {
+    List<ExpressionContext> expressions = distinctAggregationFunction.getInputExpressions();
+    List<OrderByExpressionContext> orderByExpressions = distinctAggregationFunction.getOrderByExpressions();
+    int limit = distinctAggregationFunction.getLimit();
+    if (orderByExpressions == null) {
+      return getDistinctOnlyExecutor(expressions, limit, transformOperator);
+    } else {
+      return getDistinctOrderByExecutor(expressions, orderByExpressions, limit, transformOperator);
+    }
+  }
+
+  private static DistinctExecutor getDistinctOnlyExecutor(List<ExpressionContext> expressions, int limit,
+      TransformOperator transformOperator) {
+    if (expressions.size() == 1) {
+      // Single column
+      ExpressionContext expression = expressions.get(0);
+      Dictionary dictionary = transformOperator.getDictionary(expression);
+      if (dictionary != null) {
+        // Dictionary based
+        return new DictionaryBasedSingleColumnDistinctOnlyExecutor(expression, dictionary, limit);
+      } else {
+        // Raw value based
+        DataType dataType = transformOperator.getResultMetadata(expression).getDataType();
+        switch (dataType) {
+          case INT:
+            return new RawIntSingleColumnDistinctOnlyExecutor(expression, limit);
+          case LONG:
+            return new RawLongSingleColumnDistinctOnlyExecutor(expression, limit);
+          case FLOAT:
+            return new RawFloatSingleColumnDistinctOnlyExecutor(expression, limit);
+          case DOUBLE:
+            return new RawDoubleSingleColumnDistinctOnlyExecutor(expression, limit);
+          case STRING:
+            return new RawStringSingleColumnDistinctOnlyExecutor(expression, limit);
+          case BYTES:
+            return new RawBytesSingleColumnDistinctOnlyExecutor(expression, limit);
+          default:
+            throw new IllegalStateException();
+        }
+      }
+    } else {
+      // Multiple columns
+      int numExpressions = expressions.size();
+      List<Dictionary> dictionaries = new ArrayList<>(numExpressions);
+      boolean dictionaryBased = true;
+      for (ExpressionContext expression : expressions) {
+        Dictionary dictionary = transformOperator.getDictionary(expression);
+        if (dictionary != null) {
+          dictionaries.add(dictionary);
+        } else {
+          dictionaryBased = false;
+          break;
+        }
+      }
+      if (dictionaryBased) {
+        // Dictionary based
+        return new DictionaryBasedMultiColumnDistinctOnlyExecutor(expressions, dictionaries, limit);
+      } else {
+        // Raw value based
+        List<DataType> dataTypes = new ArrayList<>(numExpressions);
+        for (ExpressionContext expression : expressions) {
+          dataTypes.add(transformOperator.getResultMetadata(expression).getDataType());
+        }
+        return new RawMultiColumnDistinctExecutor(expressions, dataTypes, null, limit);
+      }
+    }
+  }
+
+  private static DistinctExecutor getDistinctOrderByExecutor(List<ExpressionContext> expressions,
+      List<OrderByExpressionContext> orderByExpressions, int limit, TransformOperator transformOperator) {
+    if (expressions.size() == 1) {
+      // Single column
+      ExpressionContext expression = expressions.get(0);
+      Dictionary dictionary = transformOperator.getDictionary(expression);
+      // Note: Use raw value based when dictionary is not sorted (consuming segments).
+      if (dictionary != null && dictionary.isSorted()) {
+        // Dictionary based
+        assert orderByExpressions.size() == 1;
+        return new DictionaryBasedSingleColumnDistinctOrderByExecutor(expression, dictionary, orderByExpressions.get(0),
+            limit);
+      } else {
+        // Raw value based
+        DataType dataType = transformOperator.getResultMetadata(expression).getDataType();
+        assert orderByExpressions.size() == 1;
+        OrderByExpressionContext orderByExpression = orderByExpressions.get(0);
+        switch (dataType) {
+          case INT:
+            return new RawIntSingleColumnDistinctOrderByExecutor(expression, orderByExpression, limit);
+          case LONG:
+            return new RawLongSingleColumnDistinctOrderByExecutor(expression, orderByExpression, limit);
+          case FLOAT:
+            return new RawFloatSingleColumnDistinctOrderByExecutor(expression, orderByExpression, limit);
+          case DOUBLE:
+            return new RawDoubleSingleColumnDistinctOrderByExecutor(expression, orderByExpression, limit);
+          case STRING:
+            return new RawStringSingleColumnDistinctOrderByExecutor(expression, orderByExpression, limit);
+          case BYTES:
+            return new RawBytesSingleColumnDistinctOrderByExecutor(expression, orderByExpression, limit);
+          default:
+            throw new IllegalStateException();
+        }
+      }
+    } else {
+      // Multiple columns
+      int numExpressions = expressions.size();
+      List<Dictionary> dictionaries = new ArrayList<>(numExpressions);
+      boolean dictionaryBased = true;
+      for (ExpressionContext expression : expressions) {
+        Dictionary dictionary = transformOperator.getDictionary(expression);
+        // Note: Use raw value based when dictionary is not sorted (consuming segments).
+        if (dictionary != null && dictionary.isSorted()) {
+          dictionaries.add(dictionary);
+        } else {
+          dictionaryBased = false;
+          break;
+        }
+      }
+      if (dictionaryBased) {
+        // Dictionary based
+        return new DictionaryBasedMultiColumnDistinctOrderByExecutor(expressions, dictionaries, orderByExpressions,
+            limit);
+      } else {
+        // Raw value based
+        List<DataType> dataTypes = new ArrayList<>(numExpressions);
+        for (ExpressionContext expression : expressions) {
+          dataTypes.add(transformOperator.getResultMetadata(expression).getDataType());
+        }
+        return new RawMultiColumnDistinctExecutor(expressions, dataTypes, orderByExpressions, limit);
+      }
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
similarity index 52%
rename from pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java
rename to pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
index 72712e0..f637232 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/customobject/DistinctTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/DistinctTable.java
@@ -16,19 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.pinot.core.query.aggregation.function.customobject;
+package org.apache.pinot.core.query.distinct;
 
-import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import com.google.common.annotations.VisibleForTesting;
+import it.unimi.dsi.fastutil.PriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
 import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.PriorityQueue;
-import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
@@ -40,8 +41,7 @@ import org.apache.pinot.spi.utils.ByteArray;
 
 
 /**
- * The {@code DistinctTable} class serves as the intermediate result of
- * {@link org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction}.
+ * The {@code DistinctTable} stores the distinct records for the distinct queries.
  * <p>There are 2 types of DistinctTables:
  * <ul>
  *   <li>
@@ -49,62 +49,55 @@ import org.apache.pinot.spi.utils.ByteArray;
  *     or merge other DistinctTables.
  *   </li>
  *   <li>
- *     Deserialized DistinctTable (Broker-side only): Constructed with ByteBuffer, which only contains the DataSchema
- *     and records from the original main DistinctTable, but no data structure to handle the addition of new records. It
- *     cannot be used to add more records or merge other DistinctTables, but can only be used to be merged into the main
- *     DistinctTable.
+ *     Wrapper DistinctTable: Constructed with DataSchema and a collection of records, and has no data structure to
+ *     handle the addition of new records. It cannot be used to add more records or merge other DistinctTables, but can
+ *     only be used to be merged into the main DistinctTable.
  *   </li>
  * </ul>
  */
 @SuppressWarnings({"rawtypes", "unchecked"})
 public class DistinctTable {
-  private static final int MAX_INITIAL_CAPACITY = 10000;
-
-  // Available in both main and Broker-side de-serialized DistinctTable
+  // Available in both main and wrapper DistinctTable
   private final DataSchema _dataSchema;
+  private final Collection<Record> _records;
+  private final boolean _isMainTable;
 
   // Available in main DistinctTable only
   private final int _limit;
-  private final Set<Record> _uniqueRecords;
-  private final PriorityQueue<Record> _sortedRecords;
-
-  // Available in Broker-side deserialized DistinctTable only
-  private final List<Record> _records;
+  private final ObjectSet<Record> _recordSet;
+  private final PriorityQueue<Record> _priorityQueue;
 
   /**
    * Constructor of the main DistinctTable which can be used to add records and merge other DistinctTables.
    */
   public DistinctTable(DataSchema dataSchema, @Nullable List<OrderByExpressionContext> orderByExpressions, int limit) {
     _dataSchema = dataSchema;
+    _isMainTable = true;
     _limit = limit;
 
-    // TODO: see if 10k is the right max initial capacity to use
     // NOTE: When LIMIT is smaller than or equal to the MAX_INITIAL_CAPACITY, no resize is required.
-    int initialCapacity = Math.min(limit, MAX_INITIAL_CAPACITY);
-    _uniqueRecords = new ObjectOpenHashSet<>(initialCapacity);
+    int initialCapacity = Math.min(limit, DistinctExecutor.MAX_INITIAL_CAPACITY);
+    _recordSet = new ObjectOpenHashSet<>(initialCapacity);
+    _records = _recordSet;
+
     if (orderByExpressions != null) {
-      String[] columns = dataSchema.getColumnNames();
-      int numColumns = columns.length;
-      Object2IntOpenHashMap<String> columnIndexMap = new Object2IntOpenHashMap<>(numColumns);
-      for (int i = 0; i < numColumns; i++) {
-        columnIndexMap.put(columns[i], i);
-      }
-      int numOrderByColumns = orderByExpressions.size();
-      int[] orderByColumnIndexes = new int[numOrderByColumns];
-      boolean[] orderByAsc = new boolean[numOrderByColumns];
-      for (int i = 0; i < numOrderByColumns; i++) {
+      List<String> columnNames = Arrays.asList(dataSchema.getColumnNames());
+      int numOrderByExpressions = orderByExpressions.size();
+      int[] orderByExpressionIndices = new int[numOrderByExpressions];
+      int[] comparisonFactors = new int[numOrderByExpressions];
+      for (int i = 0; i < numOrderByExpressions; i++) {
         OrderByExpressionContext orderByExpression = orderByExpressions.get(i);
-        orderByColumnIndexes[i] = columnIndexMap.getInt(orderByExpression.getExpression().toString());
-        orderByAsc[i] = orderByExpression.isAsc();
+        orderByExpressionIndices[i] = columnNames.indexOf(orderByExpression.getExpression().toString());
+        comparisonFactors[i] = orderByExpression.isAsc() ? -1 : 1;
       }
-      _sortedRecords = new PriorityQueue<>(initialCapacity, (record1, record2) -> {
-        Object[] values1 = record1.getValues();
-        Object[] values2 = record2.getValues();
-        for (int i = 0; i < numOrderByColumns; i++) {
-          Comparable valueToCompare1 = (Comparable) values1[orderByColumnIndexes[i]];
-          Comparable valueToCompare2 = (Comparable) values2[orderByColumnIndexes[i]];
-          int result =
-              orderByAsc[i] ? valueToCompare2.compareTo(valueToCompare1) : valueToCompare1.compareTo(valueToCompare2);
+      _priorityQueue = new ObjectHeapPriorityQueue<>(initialCapacity, (r1, r2) -> {
+        Object[] values1 = r1.getValues();
+        Object[] values2 = r2.getValues();
+        for (int i = 0; i < numOrderByExpressions; i++) {
+          int index = orderByExpressionIndices[i];
+          Comparable value1 = (Comparable) values1[index];
+          Comparable value2 = (Comparable) values2[index];
+          int result = value1.compareTo(value2) * comparisonFactors[i];
           if (result != 0) {
             return result;
           }
@@ -112,9 +105,20 @@ public class DistinctTable {
         return 0;
       });
     } else {
-      _sortedRecords = null;
+      _priorityQueue = null;
     }
-    _records = null;
+  }
+
+  /**
+   * Constructor of the wrapper DistinctTable which can only be merged into the main DistinctTable.
+   */
+  public DistinctTable(DataSchema dataSchema, Collection<Record> records) {
+    _dataSchema = dataSchema;
+    _records = records;
+    _isMainTable = false;
+    _limit = Integer.MIN_VALUE;
+    _recordSet = null;
+    _priorityQueue = null;
   }
 
   /**
@@ -125,99 +129,103 @@ public class DistinctTable {
   }
 
   /**
+   * Returns {@code true} for main DistinctTable, {@code false} for wrapper DistinctTable.
+   */
+  public boolean isMainTable() {
+    return _isMainTable;
+  }
+
+  /**
    * Returns the number of unique records within the DistinctTable.
    */
   public int size() {
-    if (_uniqueRecords != null) {
-      // Main DistinctTable
-      return _uniqueRecords.size();
-    } else {
-      // Deserialized DistinctTable
-      return _records.size();
-    }
+    return _records.size();
+  }
+
+  @VisibleForTesting
+  public Collection<Record> getRecords() {
+    return _records;
   }
 
   /**
    * Returns {@code true} if the main DistinctTable has order-by, {@code false} otherwise.
    */
   public boolean hasOrderBy() {
-    return _sortedRecords != null;
+    assert _isMainTable;
+    return _priorityQueue != null;
   }
 
   /**
-   * Adds a record to the main DistinctTable without order-by and returns whether the DistinctTable is already
-   * satisfied.
-   * <p>NOTE: There should be no more calls to this method after it or {@link #isSatisfied()} returns {@code true}.
+   * Adds a record to the main DistinctTable without order-by and returns {@code true} if the DistinctTable is already
+   * satisfied, {@code false} otherwise.
+   * <p>NOTE: There should be no more calls to this method after it returns {@code true}.
    */
   public boolean addWithoutOrderBy(Record record) {
-    if (_uniqueRecords.add(record)) {
-      return isSatisfied();
-    } else {
-      return false;
-    }
-  }
-
-  /**
-   * Returns {@code true} if the main DistinctTable without order-by is already satisfied, {@code false} otherwise.
-   * <p>The DistinctTable is satisfied when enough unique records have been collected.
-   */
-  public boolean isSatisfied() {
-    return _uniqueRecords.size() == _limit;
+    assert _isMainTable && _priorityQueue == null;
+    _recordSet.add(record);
+    return _recordSet.size() >= _limit;
   }
 
   /**
    * Adds a record to the main DistinctTable with order-by.
    */
   public void addWithOrderBy(Record record) {
-    if (!_uniqueRecords.contains(record)) {
-      if (_sortedRecords.size() < _limit) {
-        _uniqueRecords.add(record);
-        _sortedRecords.offer(record);
+    assert _isMainTable && _priorityQueue != null;
+    if (!_recordSet.contains(record)) {
+      if (_priorityQueue.size() < _limit) {
+        _recordSet.add(record);
+        _priorityQueue.enqueue(record);
       } else {
-        Record leastRecord = _sortedRecords.peek();
-        if (_sortedRecords.comparator().compare(record, leastRecord) > 0) {
-          _uniqueRecords.remove(leastRecord);
-          _uniqueRecords.add(record);
-          _sortedRecords.poll();
-          _sortedRecords.offer(record);
+        Record firstRecord = _priorityQueue.first();
+        if (_priorityQueue.comparator().compare(record, firstRecord) > 0) {
+          _recordSet.remove(firstRecord);
+          _recordSet.add(record);
+          _priorityQueue.dequeue();
+          _priorityQueue.enqueue(record);
         }
       }
     }
   }
 
-  /*
-   * SERVER ONLY METHODS
-   */
-
   /**
-   * Merges another main DistinctTable into the main DistinctTable.
+   * Merges another DistinctTable into the main DistinctTable.
    */
-  public void mergeMainDistinctTable(DistinctTable distinctTable) {
-    mergeRecords(distinctTable._uniqueRecords);
-  }
-
-  /**
-   * Helper method to merge a collection of records into the main DistinctTable.
-   */
-  private void mergeRecords(Collection<Record> records) {
+  public void mergeTable(DistinctTable distinctTable) {
+    assert _isMainTable;
     if (hasOrderBy()) {
-      for (Record record : records) {
+      for (Record record : distinctTable._records) {
         addWithOrderBy(record);
       }
     } else {
-      if (isSatisfied()) {
-        return;
-      }
-      for (Record record : records) {
-        if (addWithoutOrderBy(record)) {
-          return;
+      if (_recordSet.size() < _limit) {
+        for (Record record : distinctTable._records) {
+          if (addWithoutOrderBy(record)) {
+            return;
+          }
         }
       }
     }
   }
 
   /**
-   * Serializes the main DistinctTable into a byte array.
+   * Returns the final result (all unique records, sorted if ordering is required) from the main DistinctTable.
+   */
+  public Iterator<Record> getFinalResult() {
+    assert _isMainTable;
+    if (_priorityQueue != null) {
+      int numRecords = _priorityQueue.size();
+      Record[] sortedRecords = new Record[numRecords];
+      for (int i = numRecords - 1; i >= 0; i--) {
+        sortedRecords[i] = _priorityQueue.dequeue();
+      }
+      return Arrays.asList(sortedRecords).iterator();
+    } else {
+      return _recordSet.iterator();
+    }
+  }
+
+  /**
+   * Serializes the DistinctTable into a byte array.
    */
   public byte[] toBytes()
       throws IOException {
@@ -225,7 +233,7 @@ public class DistinctTable {
     DataTableBuilder dataTableBuilder = new DataTableBuilder(_dataSchema);
     DataSchema.ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
-    for (Record record : _uniqueRecords) {
+    for (Record record : _records) {
       dataTableBuilder.startRow();
       Object[] values = record.getValues();
       for (int i = 0; i < numColumns; i++) {
@@ -258,26 +266,18 @@ public class DistinctTable {
     return dataTableBuilder.build().toBytes();
   }
 
-  /*
-   * BROKER ONLY METHODS
-   */
-
   /**
-   * Broker-side constructor to deserialize the DistinctTable from a {@link ByteBuffer}. The DistinctTable constructed
-   * this way cannot be used to add more records or merge other DistinctTables, but can only be used to be merged into
-   * the main DistinctTable because it does not contain the order-by information and limit.
+   * Deserializes the DistinctTable from a {@link ByteBuffer}. The DistinctTable constructed this way is a wrapper
+   * DistinctTable and cannot be used to add more records or merge other DistinctTables.
    */
-  public DistinctTable(ByteBuffer byteBuffer)
+  public static DistinctTable fromByteBuffer(ByteBuffer byteBuffer)
       throws IOException {
     DataTable dataTable = DataTableFactory.getDataTable(byteBuffer);
-    _dataSchema = dataTable.getDataSchema();
-    _limit = Integer.MIN_VALUE;
-    _uniqueRecords = null;
-    _sortedRecords = null;
+    DataSchema dataSchema = dataTable.getDataSchema();
     int numRecords = dataTable.getNumberOfRows();
-    DataSchema.ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
+    DataSchema.ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
     int numColumns = columnDataTypes.length;
-    _records = new ArrayList<>(numRecords);
+    List<Record> records = new ArrayList<>(numRecords);
     for (int i = 0; i < numRecords; i++) {
       Object[] values = new Object[numColumns];
       for (int j = 0; j < numColumns; j++) {
@@ -305,30 +305,8 @@ public class DistinctTable {
             throw new IllegalStateException();
         }
       }
-      _records.add(new Record(values));
-    }
-  }
-
-  /**
-   * Merges a deserialized DistinctTable into the main DistinctTable.
-   */
-  public void mergeDeserializedDistinctTable(DistinctTable distinctTable) {
-    mergeRecords(distinctTable._records);
-  }
-
-  /**
-   * Returns the final result (all unique records, sorted if ordering is required) from the main DistinctTable.
-   */
-  public Iterator<Record> getFinalResult() {
-    if (_sortedRecords != null) {
-      int numRecords = _sortedRecords.size();
-      LinkedList<Record> sortedRecords = new LinkedList<>();
-      for (int i = 0; i < numRecords; i++) {
-        sortedRecords.addFirst(_sortedRecords.poll());
-      }
-      return sortedRecords.iterator();
-    } else {
-      return _uniqueRecords.iterator();
+      records.add(new Record(values));
     }
+    return new DistinctTable(dataSchema, records);
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/BaseDictionaryBasedMultiColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/BaseDictionaryBasedMultiColumnDistinctExecutor.java
new file mode 100644
index 0000000..ee92d98
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/BaseDictionaryBasedMultiColumnDistinctExecutor.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.dictionary;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for multiple dictionary-encoded columns.
+ */
+abstract class BaseDictionaryBasedMultiColumnDistinctExecutor implements DistinctExecutor {
+  final List<ExpressionContext> _expressions;
+  final List<Dictionary> _dictionaries;
+  final int _limit;
+
+  final ObjectSet<DictIds> _dictIdsSet;
+
+  BaseDictionaryBasedMultiColumnDistinctExecutor(List<ExpressionContext> expressions, List<Dictionary> dictionaries,
+      int limit) {
+    _expressions = expressions;
+    _dictionaries = dictionaries;
+    _limit = limit;
+
+    _dictIdsSet = new ObjectOpenHashSet<>(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    int numExpressions = _expressions.size();
+    String[] columnNames = new String[numExpressions];
+    ColumnDataType[] columnDataTypes = new ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      columnNames[i] = _expressions.get(i).toString();
+      columnDataTypes[i] = ColumnDataType.fromDataTypeSV(_dictionaries.get(i).getValueType());
+    }
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+    List<Record> records = new ArrayList<>(_dictIdsSet.size());
+    for (DictIds dictIds : _dictIdsSet) {
+      Object[] values = new Object[numExpressions];
+      for (int i = 0; i < numExpressions; i++) {
+        int dictId = dictIds._dictIds[i];
+        Dictionary dictionary = _dictionaries.get(i);
+        values[i] = dictionary.getValueType() == DataType.BYTES ? new ByteArray(dictionary.getBytesValue(dictId))
+            : dictionary.get(dictId);
+      }
+      records.add(new Record(values));
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+
+  static class DictIds {
+    final int[] _dictIds;
+
+    DictIds(int[] dictIds) {
+      _dictIds = dictIds;
+    }
+
+    @SuppressWarnings("EqualsWhichDoesntCheckParameterClass")
+    @Override
+    public boolean equals(Object o) {
+      return Arrays.equals(_dictIds, ((DictIds) o)._dictIds);
+    }
+
+    @Override
+    public int hashCode() {
+      return Arrays.hashCode(_dictIds);
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/BaseDictionaryBasedSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/BaseDictionaryBasedSingleColumnDistinctExecutor.java
new file mode 100644
index 0000000..d6a5ac6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/BaseDictionaryBasedSingleColumnDistinctExecutor.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.dictionary;
+
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for single dictionary-encoded column.
+ */
+abstract class BaseDictionaryBasedSingleColumnDistinctExecutor implements DistinctExecutor {
+  final ExpressionContext _expression;
+  final Dictionary _dictionary;
+  final int _limit;
+
+  final IntSet _dictIdSet;
+
+  BaseDictionaryBasedSingleColumnDistinctExecutor(ExpressionContext expression, Dictionary dictionary, int limit) {
+    _expression = expression;
+    _dictionary = dictionary;
+    _limit = limit;
+
+    _dictIdSet = new IntOpenHashSet(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    DataSchema dataSchema = new DataSchema(new String[]{_expression.toString()},
+        new ColumnDataType[]{ColumnDataType.fromDataTypeSV(_dictionary.getValueType())});
+    List<Record> records = new ArrayList<>(_dictIdSet.size());
+    IntIterator dictIdIterator = _dictIdSet.iterator();
+    if (_dictionary.getValueType() == DataType.BYTES) {
+      while (dictIdIterator.hasNext()) {
+        records.add(new Record(new Object[]{new ByteArray(_dictionary.getBytesValue(dictIdIterator.nextInt()))}));
+      }
+    } else {
+      while (dictIdIterator.hasNext()) {
+        records.add(new Record(new Object[]{_dictionary.get(dictIdIterator.nextInt())}));
+      }
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedMultiColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedMultiColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..44d495d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedMultiColumnDistinctOnlyExecutor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.dictionary;
+
+import java.util.List;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with multiple dictionary-encoded columns.
+ */
+public class DictionaryBasedMultiColumnDistinctOnlyExecutor extends BaseDictionaryBasedMultiColumnDistinctExecutor {
+
+  public DictionaryBasedMultiColumnDistinctOnlyExecutor(List<ExpressionContext> expressions,
+      List<Dictionary> dictionaries, int limit) {
+    super(expressions, dictionaries, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    int numDocs = transformBlock.getNumDocs();
+    int numExpressions = _expressions.size();
+    int[][] dictIdsArray = new int[numDocs][numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expressions.get(i));
+      int[] dictIdsForExpression = blockValueSet.getDictionaryIdsSV();
+      for (int j = 0; j < numDocs; j++) {
+        dictIdsArray[j][i] = dictIdsForExpression[j];
+      }
+    }
+    for (int i = 0; i < numDocs; i++) {
+      _dictIdsSet.add(new DictIds(dictIdsArray[i]));
+      if (_dictIdsSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedMultiColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedMultiColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..e527f01
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedMultiColumnDistinctOrderByExecutor.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.dictionary;
+
+import it.unimi.dsi.fastutil.PriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
+import java.util.List;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with multiple dictionary-encoded columns.
+ */
+public class DictionaryBasedMultiColumnDistinctOrderByExecutor extends BaseDictionaryBasedMultiColumnDistinctExecutor {
+  private final PriorityQueue<DictIds> _priorityQueue;
+
+  public DictionaryBasedMultiColumnDistinctOrderByExecutor(List<ExpressionContext> expressions,
+      List<Dictionary> dictionaries, List<OrderByExpressionContext> orderByExpressions, int limit) {
+    super(expressions, dictionaries, limit);
+
+    int numOrderByExpressions = orderByExpressions.size();
+    int[] orderByExpressionIndices = new int[numOrderByExpressions];
+    int[] comparisonFactors = new int[numOrderByExpressions];
+    for (int i = 0; i < numOrderByExpressions; i++) {
+      OrderByExpressionContext orderByExpression = orderByExpressions.get(i);
+      orderByExpressionIndices[i] = expressions.indexOf(orderByExpression.getExpression());
+      comparisonFactors[i] = orderByExpression.isAsc() ? -1 : 1;
+    }
+    _priorityQueue = new ObjectHeapPriorityQueue<>(Math.min(limit, MAX_INITIAL_CAPACITY), (o1, o2) -> {
+      int[] dictIds1 = o1._dictIds;
+      int[] dictIds2 = o2._dictIds;
+      for (int i = 0; i < numOrderByExpressions; i++) {
+        int index = orderByExpressionIndices[i];
+        int result = dictIds1[index] - dictIds2[index];
+        if (result != 0) {
+          return result * comparisonFactors[i];
+        }
+      }
+      return 0;
+    });
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    int numDocs = transformBlock.getNumDocs();
+    int numExpressions = _expressions.size();
+    int[][] dictIdsArray = new int[numDocs][numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expressions.get(i));
+      int[] dictIdsForExpression = blockValueSet.getDictionaryIdsSV();
+      for (int j = 0; j < numDocs; j++) {
+        dictIdsArray[j][i] = dictIdsForExpression[j];
+      }
+    }
+    for (int i = 0; i < numDocs; i++) {
+      DictIds dictIds = new DictIds(dictIdsArray[i]);
+      if (!_dictIdsSet.contains(dictIds)) {
+        if (_dictIdsSet.size() < _limit) {
+          _dictIdsSet.add(dictIds);
+          _priorityQueue.enqueue(dictIds);
+        } else {
+          DictIds firstDictIds = _priorityQueue.first();
+          if (_priorityQueue.comparator().compare(dictIds, firstDictIds) > 0) {
+            _dictIdsSet.remove(firstDictIds);
+            _dictIdsSet.add(dictIds);
+            _priorityQueue.dequeue();
+            _priorityQueue.enqueue(dictIds);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedSingleColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..5ba37d9
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedSingleColumnDistinctOnlyExecutor.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.dictionary;
+
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with single dictionary-encoded column.
+ */
+public class DictionaryBasedSingleColumnDistinctOnlyExecutor extends BaseDictionaryBasedSingleColumnDistinctExecutor {
+
+  public DictionaryBasedSingleColumnDistinctOnlyExecutor(ExpressionContext expression, Dictionary dictionary,
+      int limit) {
+    super(expression, dictionary, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    int[] dictIds = blockValueSet.getDictionaryIdsSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      _dictIdSet.add(dictIds[i]);
+      if (_dictIdSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedSingleColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..8ac5f47
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/dictionary/DictionaryBasedSingleColumnDistinctOrderByExecutor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.dictionary;
+
+import it.unimi.dsi.fastutil.ints.IntHeapPriorityQueue;
+import it.unimi.dsi.fastutil.ints.IntPriorityQueue;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.core.segment.index.readers.Dictionary;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with single dictionary-encoded column.
+ */
+public class DictionaryBasedSingleColumnDistinctOrderByExecutor extends BaseDictionaryBasedSingleColumnDistinctExecutor {
+  private final IntPriorityQueue _priorityQueue;
+
+  public DictionaryBasedSingleColumnDistinctOrderByExecutor(ExpressionContext expression, Dictionary dictionary,
+      OrderByExpressionContext orderByExpressionContext, int limit) {
+    super(expression, dictionary, limit);
+
+    assert orderByExpressionContext.getExpression().equals(expression);
+    int comparisonFactor = orderByExpressionContext.isAsc() ? -1 : 1;
+    _priorityQueue =
+        new IntHeapPriorityQueue(Math.min(limit, MAX_INITIAL_CAPACITY), (i1, i2) -> (i1 - i2) * comparisonFactor);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    int numDocs = transformBlock.getNumDocs();
+    int[] dictIds = blockValueSet.getDictionaryIdsSV();
+    for (int i = 0; i < numDocs; i++) {
+      int dictId = dictIds[i];
+      if (!_dictIdSet.contains(dictId)) {
+        if (_dictIdSet.size() < _limit) {
+          _dictIdSet.add(dictId);
+          _priorityQueue.enqueue(dictId);
+        } else {
+          int firstDictId = _priorityQueue.firstInt();
+          if (_priorityQueue.comparator().compare(dictId, firstDictId) > 0) {
+            _dictIdSet.remove(firstDictId);
+            _dictIdSet.add(dictId);
+            _priorityQueue.dequeueInt();
+            _priorityQueue.enqueue(dictId);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBytesSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBytesSingleColumnDistinctExecutor.java
new file mode 100644
index 0000000..be0e90d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawBytesSingleColumnDistinctExecutor.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for single raw BYTES column.
+ */
+abstract class BaseRawBytesSingleColumnDistinctExecutor implements DistinctExecutor {
+  final ExpressionContext _expression;
+  final int _limit;
+
+  final ObjectSet<ByteArray> _valueSet;
+
+  BaseRawBytesSingleColumnDistinctExecutor(ExpressionContext expression, int limit) {
+    _expression = expression;
+    _limit = limit;
+
+    _valueSet = new ObjectOpenHashSet<>(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    DataSchema dataSchema =
+        new DataSchema(new String[]{_expression.toString()}, new ColumnDataType[]{ColumnDataType.BYTES});
+    List<Record> records = new ArrayList<>(_valueSet.size());
+    for (ByteArray value : _valueSet) {
+      records.add(new Record(new Object[]{value}));
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawDoubleSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawDoubleSingleColumnDistinctExecutor.java
new file mode 100644
index 0000000..d56d02f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawDoubleSingleColumnDistinctExecutor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.doubles.DoubleIterator;
+import it.unimi.dsi.fastutil.doubles.DoubleOpenHashSet;
+import it.unimi.dsi.fastutil.doubles.DoubleSet;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for single raw DOUBLE column.
+ */
+abstract class BaseRawDoubleSingleColumnDistinctExecutor implements DistinctExecutor {
+  final ExpressionContext _expression;
+  final int _limit;
+
+  final DoubleSet _valueSet;
+
+  BaseRawDoubleSingleColumnDistinctExecutor(ExpressionContext expression, int limit) {
+    _expression = expression;
+    _limit = limit;
+
+    _valueSet = new DoubleOpenHashSet(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    DataSchema dataSchema =
+        new DataSchema(new String[]{_expression.toString()}, new ColumnDataType[]{ColumnDataType.DOUBLE});
+    List<Record> records = new ArrayList<>(_valueSet.size());
+    DoubleIterator valueIterator = _valueSet.iterator();
+    while (valueIterator.hasNext()) {
+      records.add(new Record(new Object[]{valueIterator.nextDouble()}));
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawFloatSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawFloatSingleColumnDistinctExecutor.java
new file mode 100644
index 0000000..d1457f0
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawFloatSingleColumnDistinctExecutor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.floats.FloatIterator;
+import it.unimi.dsi.fastutil.floats.FloatOpenHashSet;
+import it.unimi.dsi.fastutil.floats.FloatSet;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for single raw FLOAT column.
+ */
+abstract class BaseRawFloatSingleColumnDistinctExecutor implements DistinctExecutor {
+  final ExpressionContext _expression;
+  final int _limit;
+
+  final FloatSet _valueSet;
+
+  BaseRawFloatSingleColumnDistinctExecutor(ExpressionContext expression, int limit) {
+    _expression = expression;
+    _limit = limit;
+
+    _valueSet = new FloatOpenHashSet(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    DataSchema dataSchema =
+        new DataSchema(new String[]{_expression.toString()}, new ColumnDataType[]{ColumnDataType.DOUBLE});
+    List<Record> records = new ArrayList<>(_valueSet.size());
+    FloatIterator valueIterator = _valueSet.iterator();
+    while (valueIterator.hasNext()) {
+      records.add(new Record(new Object[]{valueIterator.nextFloat()}));
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawIntSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawIntSingleColumnDistinctExecutor.java
new file mode 100644
index 0000000..963cf80
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawIntSingleColumnDistinctExecutor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.ints.IntIterator;
+import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
+import it.unimi.dsi.fastutil.ints.IntSet;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for single raw INT column.
+ */
+abstract class BaseRawIntSingleColumnDistinctExecutor implements DistinctExecutor {
+  final ExpressionContext _expression;
+  final int _limit;
+
+  final IntSet _valueSet;
+
+  BaseRawIntSingleColumnDistinctExecutor(ExpressionContext expression, int limit) {
+    _expression = expression;
+    _limit = limit;
+
+    _valueSet = new IntOpenHashSet(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    DataSchema dataSchema =
+        new DataSchema(new String[]{_expression.toString()}, new ColumnDataType[]{ColumnDataType.DOUBLE});
+    List<Record> records = new ArrayList<>(_valueSet.size());
+    IntIterator valueIterator = _valueSet.iterator();
+    while (valueIterator.hasNext()) {
+      records.add(new Record(new Object[]{valueIterator.nextInt()}));
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawLongSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawLongSingleColumnDistinctExecutor.java
new file mode 100644
index 0000000..44d26a3
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawLongSingleColumnDistinctExecutor.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.longs.LongIterator;
+import it.unimi.dsi.fastutil.longs.LongOpenHashSet;
+import it.unimi.dsi.fastutil.longs.LongSet;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for single raw LONG column.
+ */
+abstract class BaseRawLongSingleColumnDistinctExecutor implements DistinctExecutor {
+  final ExpressionContext _expression;
+  final int _limit;
+
+  final LongSet _valueSet;
+
+  BaseRawLongSingleColumnDistinctExecutor(ExpressionContext expression, int limit) {
+    _expression = expression;
+    _limit = limit;
+
+    _valueSet = new LongOpenHashSet(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    DataSchema dataSchema =
+        new DataSchema(new String[]{_expression.toString()}, new ColumnDataType[]{ColumnDataType.DOUBLE});
+    List<Record> records = new ArrayList<>(_valueSet.size());
+    LongIterator valueIterator = _valueSet.iterator();
+    while (valueIterator.hasNext()) {
+      records.add(new Record(new Object[]{valueIterator.nextLong()}));
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawStringSingleColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawStringSingleColumnDistinctExecutor.java
new file mode 100644
index 0000000..98dfec6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/BaseRawStringSingleColumnDistinctExecutor.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.objects.ObjectOpenHashSet;
+import it.unimi.dsi.fastutil.objects.ObjectSet;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * Base implementation of {@link DistinctExecutor} for single raw STRING column.
+ */
+abstract class BaseRawStringSingleColumnDistinctExecutor implements DistinctExecutor {
+  final ExpressionContext _expression;
+  final int _limit;
+
+  final ObjectSet<String> _valueSet;
+
+  BaseRawStringSingleColumnDistinctExecutor(ExpressionContext expression, int limit) {
+    _expression = expression;
+    _limit = limit;
+
+    _valueSet = new ObjectOpenHashSet<>(Math.min(limit, MAX_INITIAL_CAPACITY));
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    DataSchema dataSchema =
+        new DataSchema(new String[]{_expression.toString()}, new ColumnDataType[]{ColumnDataType.BYTES});
+    List<Record> records = new ArrayList<>(_valueSet.size());
+    for (String value : _valueSet) {
+      records.add(new Record(new Object[]{value}));
+    }
+    return new DistinctTable(dataSchema, records);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..3a17f52
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOnlyExecutor.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with single raw BYTES column.
+ */
+public class RawBytesSingleColumnDistinctOnlyExecutor extends BaseRawBytesSingleColumnDistinctExecutor {
+
+  public RawBytesSingleColumnDistinctOnlyExecutor(ExpressionContext expression, int limit) {
+    super(expression, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    byte[][] values = blockValueSet.getBytesValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      _valueSet.add(new ByteArray(values[i]));
+      if (_valueSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..8ffd40d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawBytesSingleColumnDistinctOrderByExecutor.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.PriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.spi.utils.ByteArray;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with single raw BYTES column.
+ */
+public class RawBytesSingleColumnDistinctOrderByExecutor extends BaseRawBytesSingleColumnDistinctExecutor {
+  private final PriorityQueue<ByteArray> _priorityQueue;
+
+  public RawBytesSingleColumnDistinctOrderByExecutor(ExpressionContext expression,
+      OrderByExpressionContext orderByExpression, int limit) {
+    super(expression, limit);
+
+    assert orderByExpression.getExpression().equals(expression);
+    int comparisonFactor = orderByExpression.isAsc() ? -1 : 1;
+    _priorityQueue = new ObjectHeapPriorityQueue<>(Math.min(limit, MAX_INITIAL_CAPACITY),
+        (b1, b2) -> b1.compareTo(b2) * comparisonFactor);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    byte[][] values = blockValueSet.getBytesValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      ByteArray value = new ByteArray(values[i]);
+      if (!_valueSet.contains(value)) {
+        if (_valueSet.size() < _limit) {
+          _valueSet.add(value);
+          _priorityQueue.enqueue(value);
+        } else {
+          ByteArray firstValue = _priorityQueue.first();
+          if (_priorityQueue.comparator().compare(value, firstValue) > 0) {
+            _valueSet.remove(firstValue);
+            _valueSet.add(value);
+            _priorityQueue.dequeue();
+            _priorityQueue.enqueue(value);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..43b0b97
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOnlyExecutor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with single raw DOUBLE column.
+ */
+public class RawDoubleSingleColumnDistinctOnlyExecutor extends BaseRawDoubleSingleColumnDistinctExecutor {
+
+  public RawDoubleSingleColumnDistinctOnlyExecutor(ExpressionContext expression, int limit) {
+    super(expression, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    double[] values = blockValueSet.getDoubleValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      _valueSet.add(values[i]);
+      if (_valueSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..6db1b8f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawDoubleSingleColumnDistinctOrderByExecutor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.doubles.DoubleHeapPriorityQueue;
+import it.unimi.dsi.fastutil.doubles.DoublePriorityQueue;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with single raw DOUBLE column.
+ */
+public class RawDoubleSingleColumnDistinctOrderByExecutor extends BaseRawDoubleSingleColumnDistinctExecutor {
+  private final DoublePriorityQueue _priorityQueue;
+
+  public RawDoubleSingleColumnDistinctOrderByExecutor(ExpressionContext expression,
+      OrderByExpressionContext orderByExpression, int limit) {
+    super(expression, limit);
+
+    assert orderByExpression.getExpression().equals(expression);
+    int comparisonFactor = orderByExpression.isAsc() ? -1 : 1;
+    _priorityQueue = new DoubleHeapPriorityQueue(Math.min(limit, MAX_INITIAL_CAPACITY),
+        (d1, d2) -> Double.compare(d1, d2) * comparisonFactor);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    double[] values = blockValueSet.getDoubleValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      double value = values[i];
+      if (!_valueSet.contains(value)) {
+        if (_valueSet.size() < _limit) {
+          _valueSet.add(value);
+          _priorityQueue.enqueue(value);
+        } else {
+          double firstValue = _priorityQueue.firstDouble();
+          if (_priorityQueue.comparator().compare(value, firstValue) > 0) {
+            _valueSet.remove(firstValue);
+            _valueSet.add(value);
+            _priorityQueue.dequeueDouble();
+            _priorityQueue.enqueue(value);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..c33146b
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOnlyExecutor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with single raw FLOAT column.
+ */
+public class RawFloatSingleColumnDistinctOnlyExecutor extends BaseRawFloatSingleColumnDistinctExecutor {
+
+  public RawFloatSingleColumnDistinctOnlyExecutor(ExpressionContext expression, int limit) {
+    super(expression, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    float[] values = blockValueSet.getFloatValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      _valueSet.add(values[i]);
+      if (_valueSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..25d09b5
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawFloatSingleColumnDistinctOrderByExecutor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.floats.FloatHeapPriorityQueue;
+import it.unimi.dsi.fastutil.floats.FloatPriorityQueue;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with single raw FLOAT column.
+ */
+public class RawFloatSingleColumnDistinctOrderByExecutor extends BaseRawFloatSingleColumnDistinctExecutor {
+  private final FloatPriorityQueue _priorityQueue;
+
+  public RawFloatSingleColumnDistinctOrderByExecutor(ExpressionContext expression,
+      OrderByExpressionContext orderByExpression, int limit) {
+    super(expression, limit);
+
+    assert orderByExpression.getExpression().equals(expression);
+    int comparisonFactor = orderByExpression.isAsc() ? -1 : 1;
+    _priorityQueue = new FloatHeapPriorityQueue(Math.min(limit, MAX_INITIAL_CAPACITY),
+        (f1, f2) -> Float.compare(f1, f2) * comparisonFactor);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    float[] values = blockValueSet.getFloatValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      float value = values[i];
+      if (!_valueSet.contains(value)) {
+        if (_valueSet.size() < _limit) {
+          _valueSet.add(value);
+          _priorityQueue.enqueue(value);
+        } else {
+          float firstValue = _priorityQueue.firstFloat();
+          if (_priorityQueue.comparator().compare(value, firstValue) > 0) {
+            _valueSet.remove(firstValue);
+            _valueSet.add(value);
+            _priorityQueue.dequeueFloat();
+            _priorityQueue.enqueue(value);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..d227958
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOnlyExecutor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with single raw INT column.
+ */
+public class RawIntSingleColumnDistinctOnlyExecutor extends BaseRawIntSingleColumnDistinctExecutor {
+
+  public RawIntSingleColumnDistinctOnlyExecutor(ExpressionContext expression, int limit) {
+    super(expression, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    int[] values = blockValueSet.getIntValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      _valueSet.add(values[i]);
+      if (_valueSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..4aac4a7
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawIntSingleColumnDistinctOrderByExecutor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.ints.IntHeapPriorityQueue;
+import it.unimi.dsi.fastutil.ints.IntPriorityQueue;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with single raw INT column.
+ */
+public class RawIntSingleColumnDistinctOrderByExecutor extends BaseRawIntSingleColumnDistinctExecutor {
+  private final IntPriorityQueue _priorityQueue;
+
+  public RawIntSingleColumnDistinctOrderByExecutor(ExpressionContext expression,
+      OrderByExpressionContext orderByExpression, int limit) {
+    super(expression, limit);
+
+    assert orderByExpression.getExpression().equals(expression);
+    int comparisonFactor = orderByExpression.isAsc() ? -1 : 1;
+    _priorityQueue = new IntHeapPriorityQueue(Math.min(limit, MAX_INITIAL_CAPACITY),
+        (i1, i2) -> Integer.compare(i1, i2) * comparisonFactor);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    int[] values = blockValueSet.getIntValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      int value = values[i];
+      if (!_valueSet.contains(value)) {
+        if (_valueSet.size() < _limit) {
+          _valueSet.add(value);
+          _priorityQueue.enqueue(value);
+        } else {
+          int firstValue = _priorityQueue.firstInt();
+          if (_priorityQueue.comparator().compare(value, firstValue) > 0) {
+            _valueSet.remove(firstValue);
+            _valueSet.add(value);
+            _priorityQueue.dequeueInt();
+            _priorityQueue.enqueue(value);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..342baa6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOnlyExecutor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with single raw LONG column.
+ */
+public class RawLongSingleColumnDistinctOnlyExecutor extends BaseRawLongSingleColumnDistinctExecutor {
+
+  public RawLongSingleColumnDistinctOnlyExecutor(ExpressionContext expression, int limit) {
+    super(expression, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    long[] values = blockValueSet.getLongValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      _valueSet.add(values[i]);
+      if (_valueSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..8f83040
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawLongSingleColumnDistinctOrderByExecutor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.longs.LongHeapPriorityQueue;
+import it.unimi.dsi.fastutil.longs.LongPriorityQueue;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with single raw LONG column.
+ */
+public class RawLongSingleColumnDistinctOrderByExecutor extends BaseRawLongSingleColumnDistinctExecutor {
+  private final LongPriorityQueue _priorityQueue;
+
+  public RawLongSingleColumnDistinctOrderByExecutor(ExpressionContext expression,
+      OrderByExpressionContext orderByExpression, int limit) {
+    super(expression, limit);
+
+    assert orderByExpression.getExpression().equals(expression);
+    int comparisonFactor = orderByExpression.isAsc() ? -1 : 1;
+    _priorityQueue = new LongHeapPriorityQueue(Math.min(limit, MAX_INITIAL_CAPACITY),
+        (l1, l2) -> Long.compare(l1, l2) * comparisonFactor);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    long[] values = blockValueSet.getLongValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      long value = values[i];
+      if (!_valueSet.contains(value)) {
+        if (_valueSet.size() < _limit) {
+          _valueSet.add(value);
+          _priorityQueue.enqueue(value);
+        } else {
+          long firstValue = _priorityQueue.firstLong();
+          if (_priorityQueue.comparator().compare(value, firstValue) > 0) {
+            _valueSet.remove(firstValue);
+            _valueSet.add(value);
+            _priorityQueue.dequeueLong();
+            _priorityQueue.enqueue(value);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawMultiColumnDistinctExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawMultiColumnDistinctExecutor.java
new file mode 100644
index 0000000..1d74887
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawMultiColumnDistinctExecutor.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import java.util.List;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.common.RowBasedBlockValueFetcher;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.distinct.DistinctTable;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * {@link DistinctExecutor} for multiple columns where some columns are raw (non-dictionary-encoded).
+ */
+public class RawMultiColumnDistinctExecutor implements DistinctExecutor {
+  private final List<ExpressionContext> _expressions;
+  private final DistinctTable _distinctTable;
+
+  public RawMultiColumnDistinctExecutor(List<ExpressionContext> expressions, List<DataType> dataTypes,
+      @Nullable List<OrderByExpressionContext> orderByExpressions, int limit) {
+    _expressions = expressions;
+
+    int numExpressions = expressions.size();
+    String[] columnNames = new String[numExpressions];
+    DataSchema.ColumnDataType[] columnDataTypes = new DataSchema.ColumnDataType[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      columnNames[i] = expressions.get(i).toString();
+      columnDataTypes[i] = DataSchema.ColumnDataType.fromDataTypeSV(dataTypes.get(i));
+    }
+    DataSchema dataSchema = new DataSchema(columnNames, columnDataTypes);
+    _distinctTable = new DistinctTable(dataSchema, orderByExpressions, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    int numExpressions = _expressions.size();
+    BlockValSet[] blockValSets = new BlockValSet[numExpressions];
+    for (int i = 0; i < numExpressions; i++) {
+      blockValSets[i] = transformBlock.getBlockValueSet(_expressions.get(i));
+    }
+    RowBasedBlockValueFetcher valueFetcher = new RowBasedBlockValueFetcher(blockValSets);
+    int numDocs = transformBlock.getNumDocs();
+    if (_distinctTable.hasOrderBy()) {
+      for (int i = 0; i < numDocs; i++) {
+        Record record = new Record(valueFetcher.getRow(i));
+        _distinctTable.addWithOrderBy(record);
+      }
+    } else {
+      for (int i = 0; i < numDocs; i++) {
+        Record record = new Record(valueFetcher.getRow(i));
+        if (_distinctTable.addWithoutOrderBy(record)) {
+          return true;
+        }
+      }
+    }
+    return false;
+  }
+
+  @Override
+  public DistinctTable getResult() {
+    return _distinctTable;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOnlyExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOnlyExecutor.java
new file mode 100644
index 0000000..20bf649
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOnlyExecutor.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct only queries with single raw STRING column.
+ */
+public class RawStringSingleColumnDistinctOnlyExecutor extends BaseRawStringSingleColumnDistinctExecutor {
+
+  public RawStringSingleColumnDistinctOnlyExecutor(ExpressionContext expression, int limit) {
+    super(expression, limit);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    String[] values = blockValueSet.getStringValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      _valueSet.add(values[i]);
+      if (_valueSet.size() >= _limit) {
+        return true;
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOrderByExecutor.java b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOrderByExecutor.java
new file mode 100644
index 0000000..b1feeb6
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/distinct/raw/RawStringSingleColumnDistinctOrderByExecutor.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.distinct.raw;
+
+import it.unimi.dsi.fastutil.PriorityQueue;
+import it.unimi.dsi.fastutil.objects.ObjectHeapPriorityQueue;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.operator.blocks.TransformBlock;
+import org.apache.pinot.core.query.distinct.DistinctExecutor;
+import org.apache.pinot.core.query.request.context.ExpressionContext;
+import org.apache.pinot.core.query.request.context.OrderByExpressionContext;
+
+
+/**
+ * {@link DistinctExecutor} for distinct order-by queries with single raw STRING column.
+ */
+public class RawStringSingleColumnDistinctOrderByExecutor extends BaseRawStringSingleColumnDistinctExecutor {
+  private final PriorityQueue<String> _priorityQueue;
+
+  public RawStringSingleColumnDistinctOrderByExecutor(ExpressionContext expression,
+      OrderByExpressionContext orderByExpression, int limit) {
+    super(expression, limit);
+
+    assert orderByExpression.getExpression().equals(expression);
+    int comparisonFactor = orderByExpression.isAsc() ? -1 : 1;
+    _priorityQueue = new ObjectHeapPriorityQueue<>(Math.min(limit, MAX_INITIAL_CAPACITY),
+        (s1, s2) -> s1.compareTo(s2) * comparisonFactor);
+  }
+
+  @Override
+  public boolean process(TransformBlock transformBlock) {
+    BlockValSet blockValueSet = transformBlock.getBlockValueSet(_expression);
+    String[] values = blockValueSet.getStringValuesSV();
+    int numDocs = transformBlock.getNumDocs();
+    for (int i = 0; i < numDocs; i++) {
+      String value = values[i];
+      if (!_valueSet.contains(value)) {
+        if (_valueSet.size() < _limit) {
+          _valueSet.add(value);
+          _priorityQueue.enqueue(value);
+        } else {
+          String firstValue = _priorityQueue.first();
+          if (_priorityQueue.comparator().compare(value, firstValue) > 0) {
+            _valueSet.remove(firstValue);
+            _valueSet.add(value);
+            _priorityQueue.dequeue();
+            _priorityQueue.enqueue(value);
+          }
+        }
+      }
+    }
+    return false;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
index c3dfc50..7bb4a9c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/pruner/SelectionQuerySegmentPruner.java
@@ -56,8 +56,8 @@ public class SelectionQuerySegmentPruner implements SegmentPruner {
       return segments;
     }
 
-    // Do not prune aggregation queries
-    if (QueryContextUtils.isAggregationQuery(query)) {
+    // Only prune selection queries
+    if (!QueryContextUtils.isSelectionQuery(query)) {
       return segments;
     }
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
index 6a41847..ad3020a 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -33,7 +33,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
+import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
@@ -98,7 +98,7 @@ public class DistinctDataTableReducer implements DataTableReducer {
       DistinctTable mainDistinctTable = new DistinctTable(nonEmptyDistinctTables.get(0).getDataSchema(),
           _distinctAggregationFunction.getOrderByExpressions(), _distinctAggregationFunction.getLimit());
       for (DistinctTable distinctTable : nonEmptyDistinctTables) {
-        mainDistinctTable.mergeDeserializedDistinctTable(distinctTable);
+        mainDistinctTable.mergeTable(distinctTable);
       }
 
       // Up until now, we have treated DISTINCT similar to another aggregation function even in terms
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
index f3ff7ec..3ba8ada 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/utils/QueryContextUtils.java
@@ -18,17 +18,38 @@
  */
 package org.apache.pinot.core.query.request.context.utils;
 
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
 
+@SuppressWarnings("rawtypes")
 public class QueryContextUtils {
   private QueryContextUtils() {
   }
 
   /**
+   * Returns {@code true} if the given query is a selection query, {@code false} otherwise.
+   */
+  public static boolean isSelectionQuery(QueryContext query) {
+    return query.getAggregationFunctions() == null;
+  }
+
+  /**
    * Returns {@code true} if the given query is an aggregation query, {@code false} otherwise.
    */
   public static boolean isAggregationQuery(QueryContext query) {
-    return query.getAggregationFunctions() != null;
+    AggregationFunction[] aggregationFunctions = query.getAggregationFunctions();
+    return aggregationFunctions != null && (aggregationFunctions.length != 1
+        || !(aggregationFunctions[0] instanceof DistinctAggregationFunction));
+  }
+
+  /**
+   * Returns {@code true} if the given query is a distinct query, {@code false} otherwise.
+   */
+  public static boolean isDistinctQuery(QueryContext query) {
+    AggregationFunction[] aggregationFunctions = query.getAggregationFunctions();
+    return aggregationFunctions != null && aggregationFunctions.length == 1
+        && aggregationFunctions[0] instanceof DistinctAggregationFunction;
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java
index f3d1e17..eac04ce 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/request/context/utils/BrokerRequestToQueryContextConverterTest.java
@@ -66,7 +66,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(queryContext.getLimit(), 10);
         assertEquals(queryContext.getOffset(), 0);
         assertTrue(queryContext.getColumns().isEmpty());
+        assertTrue(QueryContextUtils.isSelectionQuery(queryContext));
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -90,7 +92,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(queryContext.getLimit(), 10);
         assertEquals(queryContext.getOffset(), 0);
         assertTrue(queryContext.getColumns().isEmpty());
+        assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
         assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -119,7 +123,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(queryContext.getLimit(), 100);
         assertEquals(queryContext.getOffset(), 50);
         assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
+        assertTrue(QueryContextUtils.isSelectionQuery(queryContext));
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -153,7 +159,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(queryContext.getLimit(), 15);
         assertEquals(queryContext.getOffset(), 0);
         assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
-        assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
+        assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
+        assertTrue(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -190,7 +198,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(queryContext.getLimit(), 20);
         assertEquals(queryContext.getOffset(), 30);
         assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
+        assertTrue(QueryContextUtils.isSelectionQuery(queryContext));
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -249,7 +259,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(queryContext.getLimit(), 20);
         assertEquals(queryContext.getOffset(), 0);
         assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
+        assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
         assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -289,7 +301,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(queryContext.getLimit(), 10);
         assertEquals(queryContext.getOffset(), 0);
         assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar", "foobar")));
+        assertTrue(QueryContextUtils.isSelectionQuery(queryContext));
         assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -335,7 +349,9 @@ public class BrokerRequestToQueryContextConverterTest {
       assertEquals(queryContext.getLimit(), 10);
       assertEquals(queryContext.getOffset(), 0);
       assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
+      assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+      assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
     }
 
     // Having (only supported in SQL format)
@@ -368,7 +384,9 @@ public class BrokerRequestToQueryContextConverterTest {
       assertEquals(queryContext.getLimit(), 10);
       assertEquals(queryContext.getOffset(), 0);
       assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
+      assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+      assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
     }
 
     // Post-aggregation (only supported in SQL format)
@@ -436,7 +454,9 @@ public class BrokerRequestToQueryContextConverterTest {
               Collections.singletonList(ExpressionContext.forIdentifier("col4")))));
 
       assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("col1", "col2", "col3", "col4")));
+      assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+      assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
 
       // Expected: SUM(col1), MAX(col2), MIN(col2), SUM(col4), MAX(col4), MAX(col1)
       //noinspection rawtypes
@@ -482,7 +502,9 @@ public class BrokerRequestToQueryContextConverterTest {
         assertEquals(arguments.get(3), ExpressionContext.forLiteral("bar='b'"));
         assertEquals(arguments.get(4), ExpressionContext.forLiteral("SET_INTERSECT($1, $2)"));
         assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
+        assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
         assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+        assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
       }
     }
 
@@ -513,7 +535,9 @@ public class BrokerRequestToQueryContextConverterTest {
           new FunctionContext(FunctionContext.Type.AGGREGATION, "sum",
               Collections.singletonList(ExpressionContext.forIdentifier("bar")))));
       assertEquals(selectExpressions.get(1).toString(), "sum(bar)");
+      assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+      assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
     }
     // Use string literal as identifier for aggregation
     {
@@ -523,7 +547,9 @@ public class BrokerRequestToQueryContextConverterTest {
           new FunctionContext(FunctionContext.Type.AGGREGATION, "sum",
               Collections.singletonList(ExpressionContext.forIdentifier("foo")))));
       assertEquals(queryContext.getSelectExpressions().get(0).toString(), "sum(foo)");
+      assertFalse(QueryContextUtils.isSelectionQuery(queryContext));
       assertTrue(QueryContextUtils.isAggregationQuery(queryContext));
+      assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
     }
   }
 
@@ -554,7 +580,9 @@ public class BrokerRequestToQueryContextConverterTest {
     assertEquals(queryContext.getLimit(), 100);
     assertEquals(queryContext.getOffset(), 50);
     assertEquals(queryContext.getColumns(), new HashSet<>(Arrays.asList("foo", "bar")));
+    assertTrue(QueryContextUtils.isSelectionQuery(queryContext));
     assertFalse(QueryContextUtils.isAggregationQuery(queryContext));
+    assertFalse(QueryContextUtils.isDistinctQuery(queryContext));
   }
 
   @Test
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
index d3006ab..6a9edfc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/DistinctQueriesTest.java
@@ -30,6 +30,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
 import org.apache.pinot.common.response.broker.ResultTable;
@@ -47,8 +48,8 @@ import org.apache.pinot.core.indexsegment.IndexSegment;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegment;
 import org.apache.pinot.core.indexsegment.immutable.ImmutableSegmentLoader;
-import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
+import org.apache.pinot.core.operator.query.DistinctOperator;
+import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.apache.pinot.core.query.reduce.BrokerReduceService;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.query.request.context.utils.BrokerRequestToQueryContextConverter;
@@ -69,6 +70,7 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
@@ -90,12 +92,24 @@ public class DistinctQueriesTest extends BaseQueriesTest {
   private static final String DOUBLE_COLUMN = "doubleColumn";
   private static final String STRING_COLUMN = "stringColumn";
   private static final String BYTES_COLUMN = "bytesColumn";
+  private static final String RAW_INT_COLUMN = "rawIntColumn";
+  private static final String RAW_LONG_COLUMN = "rawLongColumn";
+  private static final String RAW_FLOAT_COLUMN = "rawFloatColumn";
+  private static final String RAW_DOUBLE_COLUMN = "rawDoubleColumn";
+  private static final String RAW_STRING_COLUMN = "rawStringColumn";
+  private static final String RAW_BYTES_COLUMN = "rawBytesColumn";
   private static final Schema SCHEMA = new Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN, DataType.INT)
       .addSingleValueDimension(LONG_COLUMN, DataType.LONG).addSingleValueDimension(FLOAT_COLUMN, DataType.FLOAT)
       .addSingleValueDimension(DOUBLE_COLUMN, DataType.DOUBLE).addSingleValueDimension(STRING_COLUMN, DataType.STRING)
-      .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES).build();
-  private static final TableConfig TABLE =
-      new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
+      .addSingleValueDimension(BYTES_COLUMN, DataType.BYTES).addSingleValueDimension(RAW_INT_COLUMN, DataType.INT)
+      .addSingleValueDimension(RAW_LONG_COLUMN, DataType.LONG).addSingleValueDimension(RAW_FLOAT_COLUMN, DataType.FLOAT)
+      .addSingleValueDimension(RAW_DOUBLE_COLUMN, DataType.DOUBLE)
+      .addSingleValueDimension(RAW_STRING_COLUMN, DataType.STRING)
+      .addSingleValueDimension(RAW_BYTES_COLUMN, DataType.BYTES).build();
+  private static final TableConfig TABLE = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .setNoDictionaryColumns(Arrays
+          .asList(RAW_INT_COLUMN, RAW_LONG_COLUMN, RAW_FLOAT_COLUMN, RAW_DOUBLE_COLUMN, RAW_STRING_COLUMN,
+              RAW_BYTES_COLUMN)).build();
 
   private IndexSegment _indexSegment;
   private List<IndexSegment> _indexSegments;
@@ -116,12 +130,22 @@ public class DistinctQueriesTest extends BaseQueriesTest {
   }
 
   @BeforeClass
-  public void setUp() {
+  public void setUp()
+      throws Exception {
     FileUtils.deleteQuietly(INDEX_DIR);
+
+    ImmutableSegment segment0 = createSegment(0, generateRecords(0));
+    ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
+    _indexSegment = segment0;
+    _indexSegments = Arrays.asList(segment0, segment1);
   }
 
   @AfterClass
   public void tearDown() {
+    for (IndexSegment indexSegment : _indexSegments) {
+      indexSegment.destroy();
+    }
+
     FileUtils.deleteQuietly(INDEX_DIR);
   }
 
@@ -142,8 +166,13 @@ public class DistinctQueriesTest extends BaseQueriesTest {
       record.putValue(DOUBLE_COLUMN, (double) value);
       String stringValue = Integer.toString(value);
       record.putValue(STRING_COLUMN, stringValue);
-      byte[] bytesValue = StringUtil.encodeUtf8(stringValue);
-      record.putValue(BYTES_COLUMN, bytesValue);
+      record.putValue(BYTES_COLUMN, StringUtil.encodeUtf8(StringUtils.leftPad(stringValue, 4)));
+      record.putValue(RAW_INT_COLUMN, value);
+      record.putValue(RAW_LONG_COLUMN, (long) value);
+      record.putValue(RAW_FLOAT_COLUMN, (float) value);
+      record.putValue(RAW_DOUBLE_COLUMN, (double) value);
+      record.putValue(RAW_STRING_COLUMN, stringValue);
+      record.putValue(RAW_BYTES_COLUMN, StringUtil.encodeUtf8(stringValue));
       uniqueRecords.add(record);
     }
 
@@ -170,140 +199,341 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     return ImmutableSegmentLoader.load(new File(INDEX_DIR, segmentName), ReadMode.mmap);
   }
 
-  /**
-   * Test DISTINCT query within a single segment.
-   * <p>The following query types are tested:
-   * <ul>
-   *   <li>Selecting all columns</li>
-   *   <li>Selecting some columns with filter</li>
-   *   <li>Selecting some columns order by BYTES column</li>
-   *   <li>Selecting some columns transform, filter, order-by and limit</li>
-   *   <li>Selecting some columns with filter that does not match any record</li>
-   * </ul>
-   */
-  private void testDistinctInnerSegmentHelper(String[] queries, boolean isPql)
-      throws Exception {
-    _indexSegment = createSegment(0, generateRecords(0));
-    try {
-      {
-        // Test selecting all columns
-
-        // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[0], isPql);
-        DataSchema dataSchema = distinctTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(),
-            new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
-
-        // Check values, where all 100 unique values should be returned
-        assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT);
-        Set<Integer> expectedValues = new HashSet<>();
-        for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
-          expectedValues.add(i);
+  @Test
+  public void testSingleColumnDistinctOnlyInnerSegment() {
+    {
+      // Numeric columns
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(intColumn) FROM testTable",
+          "SELECT DISTINCT(longColumn) FROM testTable",
+          "SELECT DISTINCT(floatColumn) FROM testTable",
+          "SELECT DISTINCT(doubleColumn) FROM testTable",
+          "SELECT DISTINCT(rawIntColumn) FROM testTable",
+          "SELECT DISTINCT(rawLongColumn) FROM testTable",
+          "SELECT DISTINCT(rawFloatColumn) FROM testTable",
+          "SELECT DISTINCT(rawDoubleColumn) FROM testTable",
+          "SELECT DISTINCT(intColumn) FROM testTable ORDER BY intColumn",
+          "SELECT DISTINCT(longColumn) FROM testTable ORDER BY longColumn",
+          "SELECT DISTINCT(floatColumn) FROM testTable ORDER BY floatColumn",
+          "SELECT DISTINCT(doubleColumn) FROM testTable ORDER BY doubleColumn",
+          "SELECT DISTINCT(rawIntColumn) FROM testTable ORDER BY rawIntColumn",
+          "SELECT DISTINCT(rawLongColumn) FROM testTable ORDER BY rawLongColumn",
+          "SELECT DISTINCT(rawFloatColumn) FROM testTable ORDER BY rawFloatColumn",
+          "SELECT DISTINCT(rawDoubleColumn) FROM testTable ORDER BY rawDoubleColumn"
+      );
+      //@formatter:on
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < 10; i++) {
+        expectedValues.add(i);
+      }
+      for (String query : queries) {
+        DistinctTable pqlDistinctTable = getDistinctTableInnerSegment(query, true);
+        DistinctTable sqlDistinctTable = getDistinctTableInnerSegment(query, false);
+        for (DistinctTable distinctTable : Arrays.asList(pqlDistinctTable, sqlDistinctTable)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<Integer> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof Number);
+            actualValues.add(((Number) values[0]).intValue());
+          }
+          assertEquals(actualValues, expectedValues);
         }
-        Set<Integer> actualValues = new HashSet<>();
-        Iterator<Record> iterator = distinctTable.getFinalResult();
-        while (iterator.hasNext()) {
-          Record record = iterator.next();
-          Object[] values = record.getValues();
-          int intValue = (int) values[0];
-          assertEquals(((Long) values[1]).intValue(), intValue);
-          assertEquals(((Float) values[2]).intValue(), intValue);
-          assertEquals(((Double) values[3]).intValue(), intValue);
-          assertEquals(Integer.parseInt((String) values[4]), intValue);
-          assertEquals(StringUtil.decodeUtf8(((ByteArray) values[5]).getBytes()), values[4]);
-          actualValues.add(intValue);
+      }
+    }
+    {
+      // String columns
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(stringColumn) FROM testTable",
+          "SELECT DISTINCT(rawStringColumn) FROM testTable"
+      );
+      //@formatter:on
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < 10; i++) {
+        expectedValues.add(i);
+      }
+      for (String query : queries) {
+        DistinctTable pqlDistinctTable = getDistinctTableInnerSegment(query, true);
+        DistinctTable sqlDistinctTable = getDistinctTableInnerSegment(query, false);
+        for (DistinctTable distinctTable : Arrays.asList(pqlDistinctTable, sqlDistinctTable)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<Integer> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof String);
+            actualValues.add(Integer.parseInt((String) values[0]));
+          }
+          assertEquals(actualValues, expectedValues);
         }
-        assertEquals(actualValues, expectedValues);
       }
-      {
-        // Test selecting some columns with filter
-
-        // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[1], isPql);
-        DataSchema dataSchema = distinctTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
+    }
+    {
+      // Bytes columns
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(bytesColumn) FROM testTable",
+          "SELECT DISTINCT(rawBytesColumn) FROM testTable"
+      );
+      //@formatter:on
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < 10; i++) {
+        expectedValues.add(i);
+      }
+      for (String query : queries) {
+        DistinctTable pqlDistinctTable = getDistinctTableInnerSegment(query, true);
+        DistinctTable sqlDistinctTable = getDistinctTableInnerSegment(query, false);
+        for (DistinctTable distinctTable : Arrays.asList(pqlDistinctTable, sqlDistinctTable)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<Integer> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof ByteArray);
+            actualValues.add(Integer.parseInt(
+                org.apache.pinot.spi.utils.StringUtils.decodeUtf8(((ByteArray) values[0]).getBytes()).trim()));
+          }
+          assertEquals(actualValues, expectedValues);
+        }
+      }
+    }
+  }
 
-        // Check values, where 40 matched values should be returned
-        assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
-        Set<Integer> expectedValues = new HashSet<>();
-        for (int i = 60; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
-          expectedValues.add(i);
+  @Test
+  public void testSingleColumnDistinctOrderByInnerSegment() {
+    {
+      // Numeric columns
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(intColumn) FROM testTable ORDER BY intColumn DESC",
+          "SELECT DISTINCT(longColumn) FROM testTable ORDER BY longColumn DESC",
+          "SELECT DISTINCT(floatColumn) FROM testTable ORDER BY floatColumn DESC",
+          "SELECT DISTINCT(doubleColumn) FROM testTable ORDER BY doubleColumn DESC",
+          "SELECT DISTINCT(rawIntColumn) FROM testTable ORDER BY rawIntColumn DESC",
+          "SELECT DISTINCT(rawLongColumn) FROM testTable ORDER BY rawLongColumn DESC",
+          "SELECT DISTINCT(rawFloatColumn) FROM testTable ORDER BY rawFloatColumn DESC",
+          "SELECT DISTINCT(rawDoubleColumn) FROM testTable ORDER BY rawDoubleColumn DESC"
+      );
+      //@formatter:on
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = NUM_UNIQUE_RECORDS_PER_SEGMENT - 10; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        expectedValues.add(i);
+      }
+      for (String query : queries) {
+        DistinctTable pqlDistinctTable = getDistinctTableInnerSegment(query, true);
+        DistinctTable sqlDistinctTable = getDistinctTableInnerSegment(query, false);
+        for (DistinctTable distinctTable : Arrays.asList(pqlDistinctTable, sqlDistinctTable)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<Integer> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof Number);
+            actualValues.add(((Number) values[0]).intValue());
+          }
+          assertEquals(actualValues, expectedValues);
         }
+      }
+    }
+    {
+      // String columns
+      //@formatter:off
+      List<String> queries = Arrays.asList(
+          "SELECT DISTINCT(stringColumn) FROM testTable ORDER BY stringColumn",
+          "SELECT DISTINCT(rawStringColumn) FROM testTable ORDER BY rawStringColumn"
+      );
+      //@formatter:on
+      Set<String> expectedValues =
+          new HashSet<>(Arrays.asList("0", "1", "10", "11", "12", "13", "14", "15", "16", "17"));
+      for (String query : queries) {
+        DistinctTable pqlDistinctTable = getDistinctTableInnerSegment(query, true);
+        DistinctTable sqlDistinctTable = getDistinctTableInnerSegment(query, false);
+        for (DistinctTable distinctTable : Arrays.asList(pqlDistinctTable, sqlDistinctTable)) {
+          assertEquals(distinctTable.size(), 10);
+          Set<String> actualValues = new HashSet<>();
+          for (Record record : distinctTable.getRecords()) {
+            Object[] values = record.getValues();
+            assertEquals(values.length, 1);
+            assertTrue(values[0] instanceof String);
+            actualValues.add((String) values[0]);
+          }
+          assertEquals(actualValues, expectedValues);
+        }
+      }
+    }
+    {
+      // Dictionary-encoded bytes column (values are left-padded to the same length)
+      String query = "SELECT DISTINCT(bytesColumn) FROM testTable ORDER BY bytesColumn";
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < 10; i++) {
+        expectedValues.add(i);
+      }
+      DistinctTable pqlDistinctTable = getDistinctTableInnerSegment(query, true);
+      DistinctTable sqlDistinctTable = getDistinctTableInnerSegment(query, false);
+      for (DistinctTable distinctTable : Arrays.asList(pqlDistinctTable, sqlDistinctTable)) {
+        assertEquals(distinctTable.size(), 10);
         Set<Integer> actualValues = new HashSet<>();
-        Iterator<Record> iterator = distinctTable.getFinalResult();
-        while (iterator.hasNext()) {
-          Record record = iterator.next();
+        for (Record record : distinctTable.getRecords()) {
           Object[] values = record.getValues();
-          int intValue = Integer.parseInt((String) values[0]);
-          assertEquals(StringUtil.decodeUtf8(((ByteArray) values[1]).getBytes()), values[0]);
-          assertEquals(((Float) values[2]).intValue(), intValue);
-          actualValues.add(intValue);
+          assertEquals(values.length, 1);
+          assertTrue(values[0] instanceof ByteArray);
+          actualValues.add(Integer
+              .parseInt(org.apache.pinot.spi.utils.StringUtils.decodeUtf8(((ByteArray) values[0]).getBytes()).trim()));
         }
         assertEquals(actualValues, expectedValues);
       }
-      {
-        // Test selecting some columns order by BYTES column
-
-        // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[2], isPql);
-        DataSchema dataSchema = distinctTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn", "bytesColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.BYTES});
-
-        // Check values, where only 5 top values sorted in ByteArray format ascending order should be returned
-        assertEquals(distinctTable.size(), 5);
-        // ByteArray of "30", "31", "3130", "3131", "3132" (same as String order because all digits can be encoded with
-        // a single byte)
-        int[] expectedValues = new int[]{0, 1, 10, 11, 12};
-        Iterator<Record> iterator = distinctTable.getFinalResult();
-        for (int i = 0; i < 5; i++) {
-          Object[] values = iterator.next().getValues();
-          int intValue = (int) values[0];
-          assertEquals(intValue, expectedValues[i]);
-          assertEquals(Integer.parseInt(StringUtil.decodeUtf8(((ByteArray) values[1]).getBytes())), intValue);
-        }
-      }
-      {
-        // Test selecting some columns with transform, filter, order-by and limit. Spaces in 'add' are intentional
-        // to ensure that AggregationFunction arguments are standardized (to remove spaces).
-
-        // Check data schema
-        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[3], isPql);
-        DataSchema dataSchema = distinctTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
-
-        // Check values, where only 10 top values sorted in string format descending order should be returned
+    }
+    {
+      // Raw bytes column
+      String query = "SELECT DISTINCT(rawBytesColumn) FROM testTable ORDER BY rawBytesColumn";
+      Set<String> expectedValues =
+          new HashSet<>(Arrays.asList("0", "1", "10", "11", "12", "13", "14", "15", "16", "17"));
+      DistinctTable pqlDistinctTable = getDistinctTableInnerSegment(query, true);
+      DistinctTable sqlDistinctTable = getDistinctTableInnerSegment(query, false);
+      for (DistinctTable distinctTable : Arrays.asList(pqlDistinctTable, sqlDistinctTable)) {
         assertEquals(distinctTable.size(), 10);
-        int[] expectedValues = new int[]{9, 8, 7, 6, 59, 58, 57, 56, 55, 54};
-        Iterator<Record> iterator = distinctTable.getFinalResult();
-        for (int i = 0; i < 10; i++) {
-          Object[] values = iterator.next().getValues();
-          int intValue = ((Double) values[0]).intValue() / 2;
-          assertEquals(intValue, expectedValues[i]);
-          assertEquals(Integer.parseInt((String) values[1]), intValue);
+        Set<String> actualValues = new HashSet<>();
+        for (Record record : distinctTable.getRecords()) {
+          Object[] values = record.getValues();
+          assertEquals(values.length, 1);
+          assertTrue(values[0] instanceof ByteArray);
+          actualValues.add(org.apache.pinot.spi.utils.StringUtils.decodeUtf8(((ByteArray) values[0]).getBytes()));
         }
+        assertEquals(actualValues, expectedValues);
       }
-      {
-        // Test selecting some columns with filter that does not match any record
-
-        // Check data schema, where data type should be STRING for all columns
-        DistinctTable distinctTable = getDistinctTableInnerSegment(queries[4], isPql);
-        DataSchema dataSchema = distinctTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+    }
+  }
 
-        // Check values, where no record should be returned
-        assertEquals(distinctTable.size(), 0);
+  /**
+   * Test DISTINCT query within a single segment.
+   * <p>The following query types are tested:
+   * <ul>
+   *   <li>Selecting all dictionary-encoded columns</li>
+   *   <li>Selecting some columns with filter</li>
+   *   <li>Selecting some columns order by raw BYTES column</li>
+   *   <li>Selecting some columns transform, filter, order-by and limit</li>
+   *   <li>Selecting some columns with filter that does not match any record</li>
+   * </ul>
+   */
+  private void testDistinctInnerSegmentHelper(String[] queries, boolean isPql) {
+    {
+      // Test selecting all dictionary-encoded columns
+
+      // Check data schema
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[0], isPql);
+      DataSchema dataSchema = distinctTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(),
+          new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(),
+          new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
+
+      // Check values, where all 100 unique values should be returned
+      assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT);
+      assertFalse(distinctTable.isMainTable());
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        expectedValues.add(i);
       }
-    } finally {
-      _indexSegment.destroy();
+      Set<Integer> actualValues = new HashSet<>();
+      for (Record record : distinctTable.getRecords()) {
+        Object[] values = record.getValues();
+        int intValue = (int) values[0];
+        assertEquals(((Long) values[1]).intValue(), intValue);
+        assertEquals(((Float) values[2]).intValue(), intValue);
+        assertEquals(((Double) values[3]).intValue(), intValue);
+        assertEquals(Integer.parseInt((String) values[4]), intValue);
+        assertEquals(StringUtil.decodeUtf8(((ByteArray) values[5]).getBytes()).trim(), values[4]);
+        actualValues.add(intValue);
+      }
+      assertEquals(actualValues, expectedValues);
+    }
+    {
+      // Test selecting some columns with filter
+
+      // Check data schema
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[1], isPql);
+      DataSchema dataSchema = distinctTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(),
+          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
+
+      // Check values, where 40 matched values should be returned
+      assertEquals(distinctTable.size(), NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+      assertFalse(distinctTable.isMainTable());
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 60; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        expectedValues.add(i);
+      }
+      Set<Integer> actualValues = new HashSet<>();
+      for (Record record : distinctTable.getRecords()) {
+        Object[] values = record.getValues();
+        int intValue = Integer.parseInt((String) values[0]);
+        assertEquals(StringUtil.decodeUtf8(((ByteArray) values[1]).getBytes()).trim(), values[0]);
+        assertEquals(((Float) values[2]).intValue(), intValue);
+        actualValues.add(intValue);
+      }
+      assertEquals(actualValues, expectedValues);
+    }
+    {
+      // Test selecting some columns order by BYTES column
+
+      // Check data schema
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[2], isPql);
+      DataSchema dataSchema = distinctTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn", "rawBytesColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.BYTES});
+
+      // Check values, where only 5 top values sorted in ByteArray format ascending order should be returned
+      assertEquals(distinctTable.size(), 5);
+      assertTrue(distinctTable.isMainTable());
+      // ByteArray of "30", "31", "3130", "3131", "3132" (same as String order because all digits can be encoded with
+      // a single byte)
+      int[] expectedValues = new int[]{0, 1, 10, 11, 12};
+      Iterator<Record> iterator = distinctTable.getFinalResult();
+      for (int i = 0; i < 5; i++) {
+        Object[] values = iterator.next().getValues();
+        int intValue = (int) values[0];
+        assertEquals(intValue, expectedValues[i]);
+        assertEquals(Integer.parseInt(StringUtil.decodeUtf8(((ByteArray) values[1]).getBytes())), intValue);
+      }
+    }
+    {
+      // Test selecting some columns with transform, filter, order-by and limit. Spaces in 'add' are intentional
+      // to ensure that AggregationFunction arguments are standardized (to remove spaces).
+
+      // Check data schema
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[3], isPql);
+      DataSchema dataSchema = distinctTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
+
+      // Check values, where only 10 top values sorted in string format descending order should be returned
+      assertEquals(distinctTable.size(), 10);
+      assertTrue(distinctTable.isMainTable());
+      int[] expectedValues = new int[]{9, 8, 7, 6, 59, 58, 57, 56, 55, 54};
+      Iterator<Record> iterator = distinctTable.getFinalResult();
+      for (int i = 0; i < 10; i++) {
+        Object[] values = iterator.next().getValues();
+        int intValue = ((Double) values[0]).intValue() / 2;
+        assertEquals(intValue, expectedValues[i]);
+        assertEquals(Integer.parseInt((String) values[1]), intValue);
+      }
+    }
+    {
+      // Test selecting some columns with filter that does not match any record
+
+      // Check data schema, where data type should be STRING for all columns
+      DistinctTable distinctTable = getDistinctTableInnerSegment(queries[4], isPql);
+      DataSchema dataSchema = distinctTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.FLOAT, ColumnDataType.LONG});
+
+      // Check values, where no record should be returned
+      assertEquals(distinctTable.size(), 0);
+      assertFalse(distinctTable.isMainTable());
     }
   }
 
@@ -311,22 +541,21 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * Test DISTINCT query within a single segment.
    * <p>The following query types are tested:
    * <ul>
-   *   <li>Selecting all columns</li>
+   *   <li>Selecting all dictionary-encoded columns</li>
    *   <li>Selecting some columns with filter</li>
-   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
    * </ul>
    */
   @Test
-  public void testDistinctInnerSegment()
-      throws Exception {
+  public void testDistinctInnerSegment() {
     //@formatter:off
     testDistinctInnerSegmentHelper(new String[]{
         "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000",
         "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000",
-        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5",
-        "SELECT DISTINCT(ADD ( intColumn,  floatColumn  ), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT DISTINCT(intColumn, rawBytesColumn) FROM testTable ORDER BY rawBytesColumn LIMIT 5",
+        "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10"
     }, true);
     //@formatter:on
@@ -336,22 +565,21 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * Test Non-Aggregation GroupBy query rewrite to Distinct query within a single segment.
    * <p>The following query types are tested:
    * <ul>
-   *   <li>Selecting all columns</li>
+   *   <li>Selecting all dictionary-encoded columns</li>
    *   <li>Selecting some columns with filter</li>
-   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
    * </ul>
    */
   @Test
-  public void testNonAggGroupByRewriteToDistinctInnerSegment()
-      throws Exception {
+  public void testNonAggGroupByRewriteToDistinctInnerSegment() {
     //@formatter:off
     testDistinctInnerSegmentHelper(new String[]{
         "SELECT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable GROUP BY intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn LIMIT 10000",
         "SELECT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 GROUP BY stringColumn, bytesColumn, floatColumn LIMIT 10000",
-        "SELECT intColumn, bytesColumn FROM testTable GROUP BY intColumn, bytesColumn ORDER BY bytesColumn LIMIT 5",
-        "SELECT ADD ( intColumn,  floatColumn  ), stringColumn FROM testTable WHERE longColumn < 60 GROUP BY ADD ( intColumn,  floatColumn  ), stringColumn ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
+        "SELECT intColumn, rawBytesColumn FROM testTable GROUP BY intColumn, rawBytesColumn ORDER BY rawBytesColumn LIMIT 5",
+        "SELECT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 GROUP BY ADD(intColumn, floatColumn), stringColumn ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, longColumn ORDER BY longColumn LIMIT 10"
     }, false);
     //@formatter:on
@@ -361,17 +589,12 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * Helper method to get the DistinctTable result for one single segment for the given query.
    */
   private DistinctTable getDistinctTableInnerSegment(String query, boolean isPql) {
-    AggregationOperator aggregationOperator;
-    if (isPql) {
-      aggregationOperator = getOperatorForPqlQuery(query);
-    } else {
-      aggregationOperator = getOperatorForSqlQuery(query);
-    }
-    List<Object> aggregationResult = aggregationOperator.nextBlock().getAggregationResult();
-    assertNotNull(aggregationResult);
-    assertEquals(aggregationResult.size(), 1);
-    assertTrue(aggregationResult.get(0) instanceof DistinctTable);
-    return (DistinctTable) aggregationResult.get(0);
+    DistinctOperator distinctOperator = isPql ? getOperatorForPqlQuery(query) : getOperatorForSqlQuery(query);
+    List<Object> operatorResult = distinctOperator.nextBlock().getAggregationResult();
+    assertNotNull(operatorResult);
+    assertEquals(operatorResult.size(), 1);
+    assertTrue(operatorResult.get(0) instanceof DistinctTable);
+    return (DistinctTable) operatorResult.get(0);
   }
 
   /**
@@ -379,9 +602,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * <p>Both PQL and SQL format are tested.
    * <p>The following query types are tested:
    * <ul>
-   *   <li>Selecting all columns</li>
+   *   <li>Selecting all dictionary-encoded columns</li>
    *   <li>Selecting some columns with filter</li>
-   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
    *   <li>
@@ -394,285 +617,273 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    *   </li>
    * </ul>
    */
-  private void testDistinctInterSegmentHelper(String[] pqlQueries, String[] sqlQueries)
-      throws Exception {
-    ImmutableSegment segment0 = createSegment(0, generateRecords(0));
-    ImmutableSegment segment1 = createSegment(1, generateRecords(1000));
-    _indexSegments = Arrays.asList(segment0, segment1);
-    try {
-      {
-        // Test selecting all columns
-        String pqlQuery = pqlQueries[0];
-        String sqlQuery = sqlQueries[0];
-
-        // Check data schema
-        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
-        SelectionResults selectionResults = pqlResponse.getSelectionResults();
-        assertNotNull(selectionResults);
-        assertEquals(selectionResults.getColumns(),
-            Arrays.asList("intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"));
-        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
-        ResultTable resultTable = sqlResponse.getResultTable();
-        assertNotNull(resultTable);
-        DataSchema dataSchema = resultTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(),
-            new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
-
-        // Check values, where all 200 unique values should be returned
-        List<Serializable[]> pqlRows = selectionResults.getRows();
-        assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
-        List<Object[]> sqlRows = resultTable.getRows();
-        assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
-        Set<Integer> expectedValues = new HashSet<>();
-        for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+  private void testDistinctInterSegmentHelper(String[] pqlQueries, String[] sqlQueries) {
+    {
+      // Test selecting all columns
+      String pqlQuery = pqlQueries[0];
+      String sqlQuery = sqlQueries[0];
+
+      // Check data schema
+      BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+      SelectionResults selectionResults = pqlResponse.getSelectionResults();
+      assertNotNull(selectionResults);
+      assertEquals(selectionResults.getColumns(),
+          Arrays.asList("intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"));
+      BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+      ResultTable resultTable = sqlResponse.getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(),
+          new String[]{"intColumn", "longColumn", "floatColumn", "doubleColumn", "stringColumn", "bytesColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(),
+          new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.LONG, ColumnDataType.FLOAT, ColumnDataType.DOUBLE, ColumnDataType.STRING, ColumnDataType.BYTES});
+
+      // Check values, where all 200 unique values should be returned
+      List<Serializable[]> pqlRows = selectionResults.getRows();
+      assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
+      List<Object[]> sqlRows = resultTable.getRows();
+      assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT);
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        expectedValues.add(i);
+        expectedValues.add(1000 + i);
+      }
+      Set<Integer> pqlValues = new HashSet<>();
+      for (Serializable[] row : pqlRows) {
+        int intValue = (int) row[0];
+        assertEquals(((Long) row[1]).intValue(), intValue);
+        assertEquals(((Float) row[2]).intValue(), intValue);
+        assertEquals(((Double) row[3]).intValue(), intValue);
+        assertEquals(Integer.parseInt((String) row[4]), intValue);
+        assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])).trim(), row[4]);
+        pqlValues.add(intValue);
+      }
+      assertEquals(pqlValues, expectedValues);
+      Set<Integer> sqlValues = new HashSet<>();
+      for (Object[] row : sqlRows) {
+        int intValue = (int) row[0];
+        assertEquals(((Long) row[1]).intValue(), intValue);
+        assertEquals(((Float) row[2]).intValue(), intValue);
+        assertEquals(((Double) row[3]).intValue(), intValue);
+        assertEquals(Integer.parseInt((String) row[4]), intValue);
+        assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])).trim(), row[4]);
+        sqlValues.add(intValue);
+      }
+      assertEquals(sqlValues, expectedValues);
+    }
+    {
+      // Test selecting some columns with filter
+      String pqlQuery = pqlQueries[1];
+      String sqlQuery = sqlQueries[1];
+
+      // Check data schema
+      BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+      SelectionResults selectionResults = pqlResponse.getSelectionResults();
+      assertNotNull(selectionResults);
+      assertEquals(selectionResults.getColumns(), Arrays.asList("stringColumn", "bytesColumn", "floatColumn"));
+      BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+      ResultTable resultTable = sqlResponse.getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(),
+          new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
+
+      // Check values, where 140 matched values should be returned
+      List<Serializable[]> pqlRows = selectionResults.getRows();
+      assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+      List<Object[]> sqlRows = resultTable.getRows();
+      assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
+      Set<Integer> expectedValues = new HashSet<>();
+      for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
+        if (i >= 60) {
           expectedValues.add(i);
-          expectedValues.add(1000 + i);
-        }
-        Set<Integer> pqlValues = new HashSet<>();
-        for (Serializable[] row : pqlRows) {
-          int intValue = (int) row[0];
-          assertEquals(((Long) row[1]).intValue(), intValue);
-          assertEquals(((Float) row[2]).intValue(), intValue);
-          assertEquals(((Double) row[3]).intValue(), intValue);
-          assertEquals(Integer.parseInt((String) row[4]), intValue);
-          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])), row[4]);
-          pqlValues.add(intValue);
-        }
-        assertEquals(pqlValues, expectedValues);
-        Set<Integer> sqlValues = new HashSet<>();
-        for (Object[] row : sqlRows) {
-          int intValue = (int) row[0];
-          assertEquals(((Long) row[1]).intValue(), intValue);
-          assertEquals(((Float) row[2]).intValue(), intValue);
-          assertEquals(((Double) row[3]).intValue(), intValue);
-          assertEquals(Integer.parseInt((String) row[4]), intValue);
-          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[5])), row[4]);
-          sqlValues.add(intValue);
-        }
-        assertEquals(sqlValues, expectedValues);
-      }
-      {
-        // Test selecting some columns with filter
-        String pqlQuery = pqlQueries[1];
-        String sqlQuery = sqlQueries[1];
-
-        // Check data schema
-        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
-        SelectionResults selectionResults = pqlResponse.getSelectionResults();
-        assertNotNull(selectionResults);
-        assertEquals(selectionResults.getColumns(), Arrays.asList("stringColumn", "bytesColumn", "floatColumn"));
-        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
-        ResultTable resultTable = sqlResponse.getResultTable();
-        assertNotNull(resultTable);
-        DataSchema dataSchema = resultTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"stringColumn", "bytesColumn", "floatColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.BYTES, ColumnDataType.FLOAT});
-
-        // Check values, where 140 matched values should be returned
-        List<Serializable[]> pqlRows = selectionResults.getRows();
-        assertEquals(pqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
-        List<Object[]> sqlRows = resultTable.getRows();
-        assertEquals(sqlRows.size(), 2 * NUM_UNIQUE_RECORDS_PER_SEGMENT - 60);
-        Set<Integer> expectedValues = new HashSet<>();
-        for (int i = 0; i < NUM_UNIQUE_RECORDS_PER_SEGMENT; i++) {
-          if (i >= 60) {
-            expectedValues.add(i);
-          }
-          expectedValues.add(1000 + i);
-        }
-        Set<Integer> pqlValues = new HashSet<>();
-        for (Serializable[] row : pqlRows) {
-          int intValue = Integer.parseInt((String) row[0]);
-          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])), row[0]);
-          assertEquals(((Float) row[2]).intValue(), intValue);
-          pqlValues.add(intValue);
-        }
-        assertEquals(pqlValues, expectedValues);
-        Set<Integer> sqlValues = new HashSet<>();
-        for (Object[] row : sqlRows) {
-          int intValue = Integer.parseInt((String) row[0]);
-          assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])), row[0]);
-          assertEquals(((Float) row[2]).intValue(), intValue);
-          sqlValues.add(intValue);
-        }
-        assertEquals(sqlValues, expectedValues);
-      }
-      {
-        // Test selecting some columns order by BYTES column
-        String pqlQuery = pqlQueries[2];
-        String sqlQuery = sqlQueries[2];
-
-        // Check data schema
-        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
-        SelectionResults selectionResults = pqlResponse.getSelectionResults();
-        assertNotNull(selectionResults);
-        assertEquals(selectionResults.getColumns(), Arrays.asList("intColumn", "bytesColumn"));
-        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
-        ResultTable resultTable = sqlResponse.getResultTable();
-        assertNotNull(resultTable);
-        DataSchema dataSchema = resultTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn", "bytesColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.BYTES});
-
-        // Check values, where only 5 top values sorted in ByteArray format ascending order should be returned
-        List<Serializable[]> pqlRows = selectionResults.getRows();
-        assertEquals(pqlRows.size(), 5);
-        List<Object[]> sqlRows = resultTable.getRows();
-        assertEquals(sqlRows.size(), 5);
-        // ByteArray of "30", "31", "3130", "31303030", "31303031" (same as String order because all digits can be
-        // encoded with a single byte)
-        int[] expectedValues = new int[]{0, 1, 10, 1000, 1001};
-        for (int i = 0; i < 5; i++) {
-          Serializable[] row = pqlRows.get(i);
-          int intValue = (int) row[0];
-          assertEquals(intValue, expectedValues[i]);
-          assertEquals(Integer.parseInt(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1]))), intValue);
-        }
-        for (int i = 0; i < 5; i++) {
-          Object[] row = sqlRows.get(i);
-          int intValue = (int) row[0];
-          assertEquals(intValue, expectedValues[i]);
-          assertEquals(Integer.parseInt(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1]))), intValue);
         }
+        expectedValues.add(1000 + i);
       }
-      {
-        // Test selecting some columns with transform, filter, order-by and limit
-        String pqlQuery = pqlQueries[3];
-        String sqlQuery = sqlQueries[3];
-
-        // Check data schema
-        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
-        SelectionResults selectionResults = pqlResponse.getSelectionResults();
-        assertNotNull(selectionResults);
-        assertEquals(selectionResults.getColumns(), Arrays.asList("add(intColumn,floatColumn)", "stringColumn"));
-        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
-        ResultTable resultTable = sqlResponse.getResultTable();
-        assertNotNull(resultTable);
-        DataSchema dataSchema = resultTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
-
-        // Check values, where only 10 top values sorted in string format descending order should be returned
-        List<Serializable[]> pqlRows = selectionResults.getRows();
-        assertEquals(pqlRows.size(), 10);
-        List<Object[]> sqlRows = resultTable.getRows();
-        assertEquals(sqlRows.size(), 10);
-        int[] expectedValues = new int[]{9, 8, 7, 6, 59, 58, 57, 56, 55, 54};
-        for (int i = 0; i < 10; i++) {
-          Serializable[] row = pqlRows.get(i);
-          int intValue = ((Double) row[0]).intValue() / 2;
-          assertEquals(intValue, expectedValues[i]);
-          assertEquals(Integer.parseInt((String) row[1]), intValue);
-        }
-        for (int i = 0; i < 10; i++) {
-          Object[] row = sqlRows.get(i);
-          int intValue = ((Double) row[0]).intValue() / 2;
-          assertEquals(intValue, expectedValues[i]);
-          assertEquals(Integer.parseInt((String) row[1]), intValue);
-        }
+      Set<Integer> pqlValues = new HashSet<>();
+      for (Serializable[] row : pqlRows) {
+        int intValue = Integer.parseInt((String) row[0]);
+        assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])).trim(), row[0]);
+        assertEquals(((Float) row[2]).intValue(), intValue);
+        pqlValues.add(intValue);
       }
-      {
-        // Test selecting some columns with filter that does not match any record
-        String pqlQuery = pqlQueries[4];
-        String sqlQuery = sqlQueries[4];
-
-        // Check data schema, where data type should be STRING for all columns
-        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
-        SelectionResults selectionResults = pqlResponse.getSelectionResults();
-        assertNotNull(selectionResults);
-        assertEquals(selectionResults.getColumns(), Arrays.asList("floatColumn", "longColumn"));
-        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
-        ResultTable resultTable = sqlResponse.getResultTable();
-        assertNotNull(resultTable);
-        DataSchema dataSchema = resultTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(),
-            new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
-
-        // Check values, where no record should be returned
-        List<Serializable[]> pqlRows = selectionResults.getRows();
-        assertTrue(pqlRows.isEmpty());
-        List<Object[]> sqlRows = resultTable.getRows();
-        assertTrue(sqlRows.isEmpty());
-      }
-      {
-        // Test selecting some columns with filter that does not match any record in one segment but matches some
-        // records in the other segment
-        String pqlQuery = pqlQueries[5];
-        String sqlQuery = sqlQueries[5];
-
-        // Check data schema
-        BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
-        SelectionResults selectionResults = pqlResponse.getSelectionResults();
-        assertNotNull(selectionResults);
-        assertEquals(selectionResults.getColumns(), Collections.singletonList("intColumn"));
-        BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
-        ResultTable resultTable = sqlResponse.getResultTable();
-        assertNotNull(resultTable);
-        DataSchema dataSchema = resultTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT});
-
-        // Check values, where only 5 top values sorted in int format ascending order should be returned
-        List<Serializable[]> pqlRows = selectionResults.getRows();
-        assertEquals(pqlRows.size(), 5);
-        List<Object[]> sqlRows = resultTable.getRows();
-        assertEquals(sqlRows.size(), 5);
-        int[] expectedValues = new int[]{1000, 1001, 1002, 1003, 1004};
-        for (int i = 0; i < 5; i++) {
-          Serializable[] row = pqlRows.get(i);
-          assertEquals((int) row[0], expectedValues[i]);
-        }
-        for (int i = 0; i < 5; i++) {
-          Object[] row = sqlRows.get(i);
-          assertEquals((int) row[0], expectedValues[i]);
-        }
+      assertEquals(pqlValues, expectedValues);
+      Set<Integer> sqlValues = new HashSet<>();
+      for (Object[] row : sqlRows) {
+        int intValue = Integer.parseInt((String) row[0]);
+        assertEquals(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1])).trim(), row[0]);
+        assertEquals(((Float) row[2]).intValue(), intValue);
+        sqlValues.add(intValue);
       }
-      {
-        // Test electing some columns with filter that does not match any record in one server but matches some records
-        // in the other server
-        String pqlQuery = pqlQueries[6];
-        String sqlQuery = sqlQueries[6];
-
-        QueryContext pqlQueryContext = QueryContextConverterUtils.getQueryContextFromPQL(pqlQuery);
-        BrokerResponseNative pqlResponse = queryServersWithDifferentSegments(pqlQueryContext, segment0, segment1);
-        BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(sqlQuery);
-        sqlBrokerRequest.setQueryOptions(Collections.singletonMap("responseFormat", "sql"));
-        QueryContext sqlQueryContext = BrokerRequestToQueryContextConverter.convert(sqlBrokerRequest);
-        BrokerResponseNative sqlResponse = queryServersWithDifferentSegments(sqlQueryContext, segment0, segment1);
-
-        // Check data schema
-        SelectionResults selectionResults = pqlResponse.getSelectionResults();
-        assertNotNull(selectionResults);
-        assertEquals(selectionResults.getColumns(), Collections.singletonList("longColumn"));
-        ResultTable resultTable = sqlResponse.getResultTable();
-        assertNotNull(resultTable);
-        DataSchema dataSchema = resultTable.getDataSchema();
-        assertEquals(dataSchema.getColumnNames(), new String[]{"longColumn"});
-        assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.LONG});
-
-        // Check values, where only 5 top values sorted in long format descending order should be returned
-        List<Serializable[]> pqlRows = selectionResults.getRows();
-        assertEquals(pqlRows.size(), 5);
-        List<Object[]> sqlRows = resultTable.getRows();
-        assertEquals(sqlRows.size(), 5);
-        int[] expectedValues = new int[]{99, 98, 97, 96, 95};
-        for (int i = 0; i < 5; i++) {
-          Serializable[] row = pqlRows.get(i);
-          assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
-        }
-        for (int i = 0; i < 5; i++) {
-          Object[] row = sqlRows.get(i);
-          assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
-        }
+      assertEquals(sqlValues, expectedValues);
+    }
+    {
+      // Test selecting some columns order by BYTES column
+      String pqlQuery = pqlQueries[2];
+      String sqlQuery = sqlQueries[2];
+
+      // Check data schema
+      BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+      SelectionResults selectionResults = pqlResponse.getSelectionResults();
+      assertNotNull(selectionResults);
+      assertEquals(selectionResults.getColumns(), Arrays.asList("intColumn", "rawBytesColumn"));
+      BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+      ResultTable resultTable = sqlResponse.getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn", "rawBytesColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT, ColumnDataType.BYTES});
+
+      // Check values, where only 5 top values sorted in ByteArray format ascending order should be returned
+      List<Serializable[]> pqlRows = selectionResults.getRows();
+      assertEquals(pqlRows.size(), 5);
+      List<Object[]> sqlRows = resultTable.getRows();
+      assertEquals(sqlRows.size(), 5);
+      // ByteArray of "30", "31", "3130", "31303030", "31303031" (same as String order because all digits can be
+      // encoded with a single byte)
+      int[] expectedValues = new int[]{0, 1, 10, 1000, 1001};
+      for (int i = 0; i < 5; i++) {
+        Serializable[] row = pqlRows.get(i);
+        int intValue = (int) row[0];
+        assertEquals(intValue, expectedValues[i]);
+        assertEquals(Integer.parseInt(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1]))), intValue);
+      }
+      for (int i = 0; i < 5; i++) {
+        Object[] row = sqlRows.get(i);
+        int intValue = (int) row[0];
+        assertEquals(intValue, expectedValues[i]);
+        assertEquals(Integer.parseInt(StringUtil.decodeUtf8(BytesUtils.toBytes((String) row[1]))), intValue);
+      }
+    }
+    {
+      // Test selecting some columns with transform, filter, order-by and limit
+      String pqlQuery = pqlQueries[3];
+      String sqlQuery = sqlQueries[3];
+
+      // Check data schema
+      BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+      SelectionResults selectionResults = pqlResponse.getSelectionResults();
+      assertNotNull(selectionResults);
+      assertEquals(selectionResults.getColumns(), Arrays.asList("add(intColumn,floatColumn)", "stringColumn"));
+      BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+      ResultTable resultTable = sqlResponse.getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"add(intColumn,floatColumn)", "stringColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.DOUBLE, ColumnDataType.STRING});
+
+      // Check values, where only 10 top values sorted in string format descending order should be returned
+      List<Serializable[]> pqlRows = selectionResults.getRows();
+      assertEquals(pqlRows.size(), 10);
+      List<Object[]> sqlRows = resultTable.getRows();
+      assertEquals(sqlRows.size(), 10);
+      int[] expectedValues = new int[]{9, 8, 7, 6, 59, 58, 57, 56, 55, 54};
+      for (int i = 0; i < 10; i++) {
+        Serializable[] row = pqlRows.get(i);
+        int intValue = ((Double) row[0]).intValue() / 2;
+        assertEquals(intValue, expectedValues[i]);
+        assertEquals(Integer.parseInt((String) row[1]), intValue);
+      }
+      for (int i = 0; i < 10; i++) {
+        Object[] row = sqlRows.get(i);
+        int intValue = ((Double) row[0]).intValue() / 2;
+        assertEquals(intValue, expectedValues[i]);
+        assertEquals(Integer.parseInt((String) row[1]), intValue);
+      }
+    }
+    {
+      // Test selecting some columns with filter that does not match any record
+      String pqlQuery = pqlQueries[4];
+      String sqlQuery = sqlQueries[4];
+
+      // Check data schema, where data type should be STRING for all columns
+      BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+      SelectionResults selectionResults = pqlResponse.getSelectionResults();
+      assertNotNull(selectionResults);
+      assertEquals(selectionResults.getColumns(), Arrays.asList("floatColumn", "longColumn"));
+      BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+      ResultTable resultTable = sqlResponse.getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"floatColumn", "longColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.STRING, ColumnDataType.STRING});
+
+      // Check values, where no record should be returned
+      List<Serializable[]> pqlRows = selectionResults.getRows();
+      assertTrue(pqlRows.isEmpty());
+      List<Object[]> sqlRows = resultTable.getRows();
+      assertTrue(sqlRows.isEmpty());
+    }
+    {
+      // Test selecting some columns with filter that does not match any record in one segment but matches some
+      // records in the other segment
+      String pqlQuery = pqlQueries[5];
+      String sqlQuery = sqlQueries[5];
+
+      // Check data schema
+      BrokerResponseNative pqlResponse = getBrokerResponseForPqlQuery(pqlQuery);
+      SelectionResults selectionResults = pqlResponse.getSelectionResults();
+      assertNotNull(selectionResults);
+      assertEquals(selectionResults.getColumns(), Collections.singletonList("intColumn"));
+      BrokerResponseNative sqlResponse = getBrokerResponseForSqlQuery(sqlQuery);
+      ResultTable resultTable = sqlResponse.getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"intColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.INT});
+
+      // Check values, where only 5 top values sorted in int format ascending order should be returned
+      List<Serializable[]> pqlRows = selectionResults.getRows();
+      assertEquals(pqlRows.size(), 5);
+      List<Object[]> sqlRows = resultTable.getRows();
+      assertEquals(sqlRows.size(), 5);
+      int[] expectedValues = new int[]{1000, 1001, 1002, 1003, 1004};
+      for (int i = 0; i < 5; i++) {
+        Serializable[] row = pqlRows.get(i);
+        assertEquals((int) row[0], expectedValues[i]);
+      }
+      for (int i = 0; i < 5; i++) {
+        Object[] row = sqlRows.get(i);
+        assertEquals((int) row[0], expectedValues[i]);
+      }
+    }
+    {
+      // Test electing some columns with filter that does not match any record in one server but matches some records
+      // in the other server
+      String pqlQuery = pqlQueries[6];
+      String sqlQuery = sqlQueries[6];
+
+      QueryContext pqlQueryContext = QueryContextConverterUtils.getQueryContextFromPQL(pqlQuery);
+      BrokerResponseNative pqlResponse = queryServersWithDifferentSegments(pqlQueryContext);
+      BrokerRequest sqlBrokerRequest = SQL_COMPILER.compileToBrokerRequest(sqlQuery);
+      sqlBrokerRequest.setQueryOptions(Collections.singletonMap("responseFormat", "sql"));
+      QueryContext sqlQueryContext = BrokerRequestToQueryContextConverter.convert(sqlBrokerRequest);
+      BrokerResponseNative sqlResponse = queryServersWithDifferentSegments(sqlQueryContext);
+
+      // Check data schema
+      SelectionResults selectionResults = pqlResponse.getSelectionResults();
+      assertNotNull(selectionResults);
+      assertEquals(selectionResults.getColumns(), Collections.singletonList("longColumn"));
+      ResultTable resultTable = sqlResponse.getResultTable();
+      assertNotNull(resultTable);
+      DataSchema dataSchema = resultTable.getDataSchema();
+      assertEquals(dataSchema.getColumnNames(), new String[]{"longColumn"});
+      assertEquals(dataSchema.getColumnDataTypes(), new ColumnDataType[]{ColumnDataType.LONG});
+
+      // Check values, where only 5 top values sorted in long format descending order should be returned
+      List<Serializable[]> pqlRows = selectionResults.getRows();
+      assertEquals(pqlRows.size(), 5);
+      List<Object[]> sqlRows = resultTable.getRows();
+      assertEquals(sqlRows.size(), 5);
+      int[] expectedValues = new int[]{99, 98, 97, 96, 95};
+      for (int i = 0; i < 5; i++) {
+        Serializable[] row = pqlRows.get(i);
+        assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
       }
-    } finally {
-      for (IndexSegment indexSegment : _indexSegments) {
-        indexSegment.destroy();
+      for (int i = 0; i < 5; i++) {
+        Object[] row = sqlRows.get(i);
+        assertEquals(((Long) row[0]).intValue(), expectedValues[i]);
       }
     }
   }
@@ -682,9 +893,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * <p>Both PQL and SQL format are tested.
    * <p>The following query types are tested:
    * <ul>
-   *   <li>Selecting all columns</li>
+   *   <li>Selecting all dictionary-encoded columns</li>
    *   <li>Selecting some columns with filter</li>
-   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
    *   <li>
@@ -698,13 +909,12 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * </ul>
    */
   @Test
-  public void testDistinctInterSegment()
-      throws Exception {
+  public void testDistinctInterSegment() {
     //@formatter:off
     String[] pqlQueries = new String[] {
         "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000",
         "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000",
-        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5",
+        "SELECT DISTINCT(intColumn, rawBytesColumn) FROM testTable ORDER BY rawBytesColumn LIMIT 5",
         "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10",
         "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
@@ -713,7 +923,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     String[] sqlQueries = new String[] {
         "SELECT DISTINCT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable LIMIT 10000",
         "SELECT DISTINCT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 LIMIT 10000",
-        "SELECT DISTINCT intColumn, bytesColumn FROM testTable ORDER BY bytesColumn LIMIT 5",
+        "SELECT DISTINCT intColumn, rawBytesColumn FROM testTable ORDER BY rawBytesColumn LIMIT 5",
         "SELECT DISTINCT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT DISTINCT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10",
         "SELECT DISTINCT intColumn FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
@@ -728,9 +938,9 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * <p>Only SQL format are tested.
    * <p>The following query types are tested:
    * <ul>
-   *   <li>Selecting all columns</li>
+   *   <li>Selecting all dictionary-encoded columns</li>
    *   <li>Selecting some columns with filter</li>
-   *   <li>Selecting some columns order by BYTES column</li>
+   *   <li>Selecting some columns order by raw BYTES column</li>
    *   <li>Selecting some columns transform, filter, order-by and limit</li>
    *   <li>Selecting some columns with filter that does not match any record</li>
    *   <li>
@@ -744,13 +954,12 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * </ul>
    */
   @Test
-  public void testNonAggGroupByRewriteToDistinctInterSegment()
-      throws Exception {
+  public void testNonAggGroupByRewriteToDistinctInterSegment() {
     //@formatter:off
     String[] pqlQueries = new String[] {
         "SELECT DISTINCT(intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn) FROM testTable LIMIT 10000",
         "SELECT DISTINCT(stringColumn, bytesColumn, floatColumn) FROM testTable WHERE intColumn >= 60 LIMIT 10000",
-        "SELECT DISTINCT(intColumn, bytesColumn) FROM testTable ORDER BY bytesColumn LIMIT 5",
+        "SELECT DISTINCT(intColumn, rawBytesColumn) FROM testTable ORDER BY rawBytesColumn LIMIT 5",
         "SELECT DISTINCT(ADD(intColumn, floatColumn), stringColumn) FROM testTable WHERE longColumn < 60 ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT DISTINCT(floatColumn, longColumn) FROM testTable WHERE stringColumn = 'a' ORDER BY longColumn LIMIT 10",
         "SELECT DISTINCT(intColumn) FROM testTable WHERE floatColumn > 200 ORDER BY intColumn ASC LIMIT 5",
@@ -759,7 +968,7 @@ public class DistinctQueriesTest extends BaseQueriesTest {
     String[] sqlQueries = new String[] {
         "SELECT intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn FROM testTable GROUP BY intColumn, longColumn, floatColumn, doubleColumn, stringColumn, bytesColumn LIMIT 10000",
         "SELECT stringColumn, bytesColumn, floatColumn FROM testTable WHERE intColumn >= 60 GROUP BY stringColumn, bytesColumn, floatColumn LIMIT 10000",
-        "SELECT intColumn, bytesColumn FROM testTable GROUP BY intColumn, bytesColumn ORDER BY bytesColumn LIMIT 5",
+        "SELECT intColumn, rawBytesColumn FROM testTable GROUP BY intColumn, rawBytesColumn ORDER BY rawBytesColumn LIMIT 5",
         "SELECT ADD(intColumn, floatColumn), stringColumn FROM testTable WHERE longColumn < 60 GROUP BY ADD(intColumn, floatColumn), stringColumn ORDER BY stringColumn DESC, ADD(intColumn, floatColumn) ASC LIMIT 10",
         "SELECT floatColumn, longColumn FROM testTable WHERE stringColumn = 'a' GROUP BY floatColumn, longColumn ORDER BY longColumn LIMIT 10",
         "SELECT intColumn FROM testTable WHERE floatColumn > 200 GROUP BY intColumn ORDER BY intColumn ASC LIMIT 5",
@@ -773,8 +982,10 @@ public class DistinctQueriesTest extends BaseQueriesTest {
    * Helper method to query 2 servers with different segments. Server0 will have 2 copies of segment0; Server1 will have
    * 2 copies of segment1.
    */
-  private BrokerResponseNative queryServersWithDifferentSegments(QueryContext queryContext, ImmutableSegment segment0,
-      ImmutableSegment segment1) {
+  private BrokerResponseNative queryServersWithDifferentSegments(QueryContext queryContext) {
+    IndexSegment segment0 = _indexSegments.get(0);
+    IndexSegment segment1 = _indexSegments.get(1);
+
     // Server side
     DataTable instanceResponse0 = PLAN_MAKER
         .makeInstancePlan(Arrays.asList(segment0, segment0), queryContext, EXECUTOR_SERVICE,
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
index 0d6a11e..ab8ea1e 100644
--- a/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/InnerSegmentAggregationSingleValueQueriesTest.java
@@ -18,17 +18,20 @@
  */
 package org.apache.pinot.queries;
 
-import java.util.Iterator;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.core.data.table.Record;
 import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
 import org.apache.pinot.core.operator.query.AggregationGroupByOperator;
 import org.apache.pinot.core.operator.query.AggregationOperator;
-import org.apache.pinot.core.query.aggregation.function.customobject.DistinctTable;
+import org.apache.pinot.core.operator.query.DistinctOperator;
+import org.apache.pinot.core.query.distinct.DistinctTable;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
 
 @SuppressWarnings("ConstantConditions")
 public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleValueQueriesTest {
@@ -175,26 +178,23 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
   @Test
   public void testSingleColumnDistinct() {
     String query = "SELECT DISTINCT(column1) FROM testTable LIMIT 1000000";
-    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
-    IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    DistinctOperator distinctOperator = getOperatorForPqlQuery(query);
+    IntermediateResultsBlock resultsBlock = distinctOperator.nextBlock();
     List<Object> operatorResult = resultsBlock.getAggregationResult();
 
-    Assert.assertEquals(operatorResult.size(), 1);
-    Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
+    assertEquals(operatorResult.size(), 1);
+    assertTrue(operatorResult.get(0) instanceof DistinctTable);
 
     DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
-    Assert.assertEquals(distinctTable.size(), 6582);
+    assertEquals(distinctTable.size(), 6582);
 
     DataSchema dataSchema = distinctTable.getDataSchema();
-    Assert.assertEquals(dataSchema.getColumnNames(), new String[]{"column1"});
-    Assert
-        .assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column1"});
+    assertEquals(dataSchema.getColumnDataTypes(), new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT});
 
-    Iterator<Record> iterator = distinctTable.getFinalResult();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
+    for (Record record : distinctTable.getRecords()) {
       Assert.assertNotNull(record);
-      Assert.assertEquals(record.getValues().length, 1);
+      assertEquals(record.getValues().length, 1);
     }
   }
 
@@ -207,26 +207,24 @@ public class InnerSegmentAggregationSingleValueQueriesTest extends BaseSingleVal
   @Test
   public void testMultiColumnDistinct() {
     String query = "SELECT DISTINCT(column1, column3) FROM testTable LIMIT 1000000";
-    AggregationOperator aggregationOperator = getOperatorForPqlQuery(query);
-    IntermediateResultsBlock resultsBlock = aggregationOperator.nextBlock();
+    DistinctOperator distinctOperator = getOperatorForPqlQuery(query);
+    IntermediateResultsBlock resultsBlock = distinctOperator.nextBlock();
     List<Object> operatorResult = resultsBlock.getAggregationResult();
 
-    Assert.assertEquals(operatorResult.size(), 1);
-    Assert.assertTrue(operatorResult.get(0) instanceof DistinctTable);
+    assertEquals(operatorResult.size(), 1);
+    assertTrue(operatorResult.get(0) instanceof DistinctTable);
 
     DistinctTable distinctTable = (DistinctTable) operatorResult.get(0);
-    Assert.assertEquals(distinctTable.size(), 21968);
+    assertEquals(distinctTable.size(), 21968);
 
     DataSchema dataSchema = distinctTable.getDataSchema();
-    Assert.assertEquals(dataSchema.getColumnNames(), new String[]{"column1", "column3"});
-    Assert.assertEquals(dataSchema.getColumnDataTypes(),
+    assertEquals(dataSchema.getColumnNames(), new String[]{"column1", "column3"});
+    assertEquals(dataSchema.getColumnDataTypes(),
         new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.INT});
 
-    Iterator<Record> iterator = distinctTable.getFinalResult();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
+    for (Record record : distinctTable.getRecords()) {
       Assert.assertNotNull(record);
-      Assert.assertEquals(record.getValues().length, 2);
+      assertEquals(record.getValues().length, 2);
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org