You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2019/11/26 16:49:41 UTC

[incubator-pinot] branch master updated: Split BrokerReduceService code into several DataTableReducers (#4851)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5c02b14  Split BrokerReduceService code into several DataTableReducers (#4851)
5c02b14 is described below

commit 5c02b14c5a1188ca0e72e3b636231dcb5ff831d1
Author: Neha Pawar <np...@linkedin.com>
AuthorDate: Tue Nov 26 08:49:32 2019 -0800

    Split BrokerReduceService code into several DataTableReducers (#4851)
    
    Splitting the code which sets results into BrokerResponseNative, in BrokerReduceService, into multiple DataTableReducers. The work in this PR is mostly copy pasting the methods from BrokerReduceService into the right Reducer, without any logic change. The only logic added, is the handling of empty dataTableMap into each setter. It was done upfront in the earlier implementation, before selecting the path based on the query.
---
 .../query/reduce/AggregationDataTableReducer.java  | 109 ++++
 .../core/query/reduce/BrokerReduceService.java     | 633 +--------------------
 .../pinot/core/query/reduce/DataTableReducer.java  |  45 ++
 .../query/reduce/DistinctDataTableReducer.java     | 132 +++++
 .../core/query/reduce/GroupByDataTableReducer.java | 438 ++++++++++++++
 .../core/query/reduce/ResultReducerFactory.java    |  60 ++
 .../query/reduce/SelectionDataTableReducer.java    | 136 +++++
 7 files changed, 922 insertions(+), 631 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
new file mode 100644
index 0000000..18d3eed
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.QueryOptions;
+
+
+/**
+ * Helper class to reduce and set Aggregation results into the BrokerResponseNative
+ */
+public class AggregationDataTableReducer implements DataTableReducer {
+
+  private final AggregationFunction[] _aggregationFunctions;
+  private final boolean _preserveType;
+
+  AggregationDataTableReducer(BrokerRequest brokerRequest, AggregationFunction[] aggregationFunctions,
+      QueryOptions queryOptions) {
+    _aggregationFunctions = aggregationFunctions;
+    _preserveType = queryOptions.isPreserveType();
+  }
+
+  /**
+   * Reduces data tables and sets aggregations results into BrokerResponseNative::AggregationResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema, Map<ServerRoutingInstance, DataTable> dataTableMap,
+      BrokerResponseNative brokerResponseNative, BrokerMetrics brokerMetrics) {
+    if (dataTableMap.isEmpty()) {
+      return;
+    }
+
+    assert dataSchema != null;
+
+    int numAggregationFunctions = _aggregationFunctions.length;
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    // Merge results from all data tables.
+    Object[] intermediateResults = new Object[numAggregationFunctions];
+    for (DataTable dataTable : dataTables) {
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        Object intermediateResultToMerge;
+        DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+        switch (columnDataType) {
+          case LONG:
+            intermediateResultToMerge = dataTable.getLong(0, i);
+            break;
+          case DOUBLE:
+            intermediateResultToMerge = dataTable.getDouble(0, i);
+            break;
+          case OBJECT:
+            intermediateResultToMerge = dataTable.getObject(0, i);
+            break;
+          default:
+            throw new IllegalStateException("Illegal column data type in aggregation results: " + columnDataType);
+        }
+        Object mergedIntermediateResult = intermediateResults[i];
+        if (mergedIntermediateResult == null) {
+          intermediateResults[i] = intermediateResultToMerge;
+        } else {
+          intermediateResults[i] = _aggregationFunctions[i].merge(mergedIntermediateResult, intermediateResultToMerge);
+        }
+      }
+    }
+
+    // Extract final results and set them into the broker response.
+    List<AggregationResult> reducedAggregationResults = new ArrayList<>(numAggregationFunctions);
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      Serializable resultValue = AggregationFunctionUtils
+          .getSerializableValue(_aggregationFunctions[i].extractFinalResult(intermediateResults[i]));
+
+      // Format the value into string if required
+      if (!_preserveType) {
+        resultValue = AggregationFunctionUtils.formatValue(resultValue);
+      }
+      reducedAggregationResults.add(new AggregationResult(dataSchema.getColumnName(i), resultValue));
+    }
+    brokerResponseNative.setAggregationResults(reducedAggregationResults);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 98160a7..ff6ff0d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -18,59 +18,22 @@
  */
 package org.apache.pinot.core.query.reduce;
 
-import com.google.common.base.Preconditions;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
 import java.util.concurrent.TimeUnit;
-import java.util.function.BiFunction;
 import javax.annotation.Nullable;
 import javax.annotation.concurrent.ThreadSafe;
 import org.apache.pinot.common.config.TableNameBuilder;
-import org.apache.pinot.common.exception.QueryException;
-import org.apache.pinot.common.function.AggregationFunctionType;
 import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
-import org.apache.pinot.common.request.AggregationInfo;
 import org.apache.pinot.common.request.BrokerRequest;
-import org.apache.pinot.common.request.GroupBy;
-import org.apache.pinot.common.request.HavingFilterQuery;
-import org.apache.pinot.common.request.HavingFilterQueryMap;
-import org.apache.pinot.common.request.Selection;
-import org.apache.pinot.common.request.SelectionSort;
-import org.apache.pinot.common.response.broker.AggregationResult;
 import org.apache.pinot.common.response.broker.BrokerResponseNative;
-import org.apache.pinot.common.response.broker.GroupByResult;
 import org.apache.pinot.common.response.broker.QueryProcessingException;
-import org.apache.pinot.common.response.broker.ResultTable;
-import org.apache.pinot.common.response.broker.SelectionResults;
-import org.apache.pinot.common.utils.BytesUtils;
 import org.apache.pinot.common.utils.DataSchema;
-import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
-import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
-import org.apache.pinot.core.data.table.IndexedTable;
-import org.apache.pinot.core.data.table.Record;
-import org.apache.pinot.core.query.aggregation.DistinctTable;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
-import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
-import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
-import org.apache.pinot.core.query.selection.SelectionOperatorService;
-import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
-import org.apache.pinot.core.util.GroupByUtils;
-import org.apache.pinot.core.util.QueryOptions;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 
 /**
@@ -79,7 +42,6 @@ import org.slf4j.LoggerFactory;
  */
 @ThreadSafe
 public class BrokerReduceService {
-  private static final Logger LOGGER = LoggerFactory.getLogger(BrokerReduceService.class);
 
   public BrokerResponseNative reduceOnDataTable(BrokerRequest brokerRequest,
       Map<ServerRoutingInstance, DataTable> dataTableMap, @Nullable BrokerMetrics brokerMetrics) {
@@ -215,599 +177,8 @@ public class BrokerReduceService {
       }
     }
 
-    // Parse the option from request whether to preserve the type
-    QueryOptions queryOptions = new QueryOptions(brokerRequest.getQueryOptions());
-    Selection selection = brokerRequest.getSelections();
-    if (dataTableMap.isEmpty()) {
-      // For empty data table map, construct empty result using the cached data schema for selection query if exists
-      if (cachedDataSchema != null) {
-        if (brokerRequest.isSetSelections()) {
-          List<String> selectionColumns = SelectionOperatorUtils
-              .getSelectionColumns(brokerRequest.getSelections().getSelectionColumns(), cachedDataSchema);
-          brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, new ArrayList<>(0)));
-        } else if (brokerRequest.isSetGroupBy() && queryOptions.isGroupByModeSQL() && queryOptions
-            .isResponseFormatSQL()) {
-          setSQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
-              brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), Collections.emptyList());
-        }
-      }
-    } else {
-      // Reduce server responses data and set query results into the broker response
-      assert cachedDataSchema != null;
-
-      if (selection != null) {
-        // Selection query
-
-        // For data table map with more than one data tables, remove conflicting data tables
-        if (dataTableMap.size() > 1) {
-          List<ServerRoutingInstance> droppedServers = removeConflictingResponses(cachedDataSchema, dataTableMap);
-          if (!droppedServers.isEmpty()) {
-            String errorMessage =
-                QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
-                    + " from servers: " + droppedServers + " got dropped due to data schema inconsistency.";
-            LOGGER.warn(errorMessage);
-            if (brokerMetrics != null) {
-              brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
-            }
-            brokerResponseNative
-                .addToExceptions(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
-          }
-        }
-
-        setSelectionResults(brokerResponseNative, selection, dataTableMap.values(), cachedDataSchema,
-            queryOptions.isPreserveType());
-      } else {
-        // Aggregation query
-
-        AggregationFunction[] aggregationFunctions = AggregationFunctionUtils.getAggregationFunctions(brokerRequest);
-        if (!brokerRequest.isSetGroupBy()) {
-          // Aggregation only query.
-          setAggregationResults(brokerRequest, brokerResponseNative, aggregationFunctions, dataTableMap.values(), cachedDataSchema,
-              queryOptions.isPreserveType());
-        } else {
-          // Aggregation group-by query.
-          // read results as records if  GROUP_BY_MODE is explicitly set to SQL
-
-          if (queryOptions.isGroupByModeSQL()) {
-            // sql + order by
-
-            int resultSize = 0;
-
-            // if RESPONSE_FORMAT is SQL, return results in {@link ResultTable}
-            if (queryOptions.isResponseFormatSQL()) {
-              setSQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
-                  brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap.values());
-              resultSize = brokerResponseNative.getResultTable().getRows().size();
-            } else {
-              setPQLGroupByOrderByResults(brokerResponseNative, cachedDataSchema, brokerRequest.getAggregationsInfo(),
-                  brokerRequest.getGroupBy(), brokerRequest.getOrderBy(), dataTableMap.values(),
-                  queryOptions.isPreserveType());
-              if (!brokerResponseNative.getAggregationResults().isEmpty()) {
-                resultSize = brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size();
-              }
-            }
-            if (brokerMetrics != null && resultSize > 0) {
-              brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.GROUP_BY_SIZE, resultSize);
-            }
-          } else {
-
-            boolean[] aggregationFunctionSelectStatus =
-                AggregationFunctionUtils.getAggregationFunctionsSelectStatus(brokerRequest.getAggregationsInfo());
-            setGroupByHavingResults(brokerResponseNative, aggregationFunctions, aggregationFunctionSelectStatus,
-                brokerRequest.getGroupBy(), dataTableMap.values(), brokerRequest.getHavingFilterQuery(),
-                brokerRequest.getHavingFilterSubQueryMap(), queryOptions.isPreserveType());
-            if (brokerMetrics != null && (!brokerResponseNative.getAggregationResults().isEmpty())) {
-              // We emit the group by size when the result isn't empty. All the sizes among group-by results should be the same.
-              // Thus, we can just emit the one from the 1st result.
-              brokerMetrics.addMeteredQueryValue(brokerRequest, BrokerMeter.GROUP_BY_SIZE,
-                  brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size());
-            }
-          }
-        }
-      }
-    }
+    DataTableReducer dataTableReducer = ResultReducerFactory.getResultReducer(brokerRequest);
+    dataTableReducer.reduceAndSetResults(tableName, cachedDataSchema, dataTableMap, brokerResponseNative, brokerMetrics);
     return brokerResponseNative;
   }
-
-  /**
-   * Given a data schema, remove data tables that are not compatible with this data schema.
-   * <p>Upgrade the data schema passed in to cover all remaining data schemas.
-   *
-   * @param dataSchema data schema.
-   * @param dataTableMap map from server to data table.
-   * @return list of server names where the data table got removed.
-   */
-  private List<ServerRoutingInstance> removeConflictingResponses(DataSchema dataSchema,
-      Map<ServerRoutingInstance, DataTable> dataTableMap) {
-    List<ServerRoutingInstance> droppedServers = new ArrayList<>();
-    Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
-    while (iterator.hasNext()) {
-      Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
-      DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
-      assert dataSchemaToCompare != null;
-      if (!dataSchema.isTypeCompatibleWith(dataSchemaToCompare)) {
-        droppedServers.add(entry.getKey());
-        iterator.remove();
-      } else {
-        dataSchema.upgradeToCover(dataSchemaToCompare);
-      }
-    }
-    return droppedServers;
-  }
-
-  /**
-   * Reduce selection results from multiple servers and set them into BrokerResponseNative passed in.
-   *
-   * @param brokerResponseNative broker response.
-   * @param selection selection information.
-   * @param dataTables Collection of data tables
-   * @param dataSchema data schema.
-   */
-  private void setSelectionResults(BrokerResponseNative brokerResponseNative, Selection selection,
-      Collection<DataTable> dataTables, DataSchema dataSchema, boolean preserveType) {
-    int selectionSize = selection.getSize();
-    if (selectionSize > 0 && selection.isSetSelectionSortSequence()) {
-      // Selection order-by
-      SelectionOperatorService selectionService = new SelectionOperatorService(selection, dataSchema);
-      selectionService.reduceWithOrdering(dataTables);
-      brokerResponseNative.setSelectionResults(selectionService.renderSelectionResultsWithOrdering(preserveType));
-    } else {
-      // Selection only
-      List<String> selectionColumns =
-          SelectionOperatorUtils.getSelectionColumns(selection.getSelectionColumns(), dataSchema);
-      brokerResponseNative.setSelectionResults(SelectionOperatorUtils.renderSelectionResultsWithoutOrdering(
-          SelectionOperatorUtils.reduceWithoutOrdering(dataTables, selectionSize), dataSchema, selectionColumns,
-          preserveType));
-    }
-  }
-
-  private boolean isDistinct(final AggregationFunction[] aggregationFunctions) {
-    return aggregationFunctions.length == 1 && aggregationFunctions[0].getType() == AggregationFunctionType.DISTINCT;
-  }
-
-  /**
-   * Reduce aggregation results from multiple servers and set them into BrokerResponseNative passed in.
-   *
-   * @param brokerResponseNative broker response.
-   * @param aggregationFunctions array of aggregation functions.
-   * @param dataTables Collection of data tables
-   * @param dataSchema data schema.
-   */
-  @SuppressWarnings("unchecked")
-  private void setAggregationResults(BrokerRequest brokerRequest, BrokerResponseNative brokerResponseNative,
-      AggregationFunction[] aggregationFunctions, Collection<DataTable> dataTables, DataSchema dataSchema,
-      boolean preserveType) {
-    if (isDistinct(aggregationFunctions)) {
-      // Special handling for DISTINCT aggregation function
-      setDistinctQueryResults(brokerRequest, brokerResponseNative, dataTables, dataSchema, aggregationFunctions[0]);
-    } else {
-      // handle all other aggregation functions
-      int numAggregationFunctions = aggregationFunctions.length;
-      // Merge results from all data tables.
-      Object[] intermediateResults = new Object[numAggregationFunctions];
-      for (DataTable dataTable : dataTables) {
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          Object intermediateResultToMerge;
-          ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
-          switch (columnDataType) {
-            case LONG:
-              intermediateResultToMerge = dataTable.getLong(0, i);
-              break;
-            case DOUBLE:
-              intermediateResultToMerge = dataTable.getDouble(0, i);
-              break;
-            case OBJECT:
-              intermediateResultToMerge = dataTable.getObject(0, i);
-              break;
-            default:
-              throw new IllegalStateException("Illegal column data type in aggregation results: " + columnDataType);
-          }
-          Object mergedIntermediateResult = intermediateResults[i];
-          if (mergedIntermediateResult == null) {
-            intermediateResults[i] = intermediateResultToMerge;
-          } else {
-            intermediateResults[i] = aggregationFunctions[i].merge(mergedIntermediateResult, intermediateResultToMerge);
-          }
-        }
-      }
-
-      // Extract final results and set them into the broker response.
-      List<AggregationResult> reducedAggregationResults = new ArrayList<>(numAggregationFunctions);
-      for (int i = 0; i < numAggregationFunctions; i++) {
-        Serializable resultValue = AggregationFunctionUtils
-            .getSerializableValue(aggregationFunctions[i].extractFinalResult(intermediateResults[i]));
-
-        // Format the value into string if required
-        if (!preserveType) {
-          resultValue = AggregationFunctionUtils.formatValue(resultValue);
-        }
-        reducedAggregationResults.add(new AggregationResult(dataSchema.getColumnName(i), resultValue));
-      }
-      brokerResponseNative.setAggregationResults(reducedAggregationResults);
-    }
-  }
-
-  private void setDistinctQueryResults(BrokerRequest brokerRequest, BrokerResponseNative brokerResponseNative,
-      Collection<DataTable> dataTables, DataSchema dataSchema, AggregationFunction aggregationFunction) {
-    // DISTINCT is implemented as an aggregation function in the execution engine. Just like
-    // other aggregation functions, DISTINCT returns its result as a single object
-    // (of type DistinctTable) serialized by the server into the DataTable and deserialized
-    // by the broker from the DataTable. So there should be exactly 1 row and 1 column and that
-    // column value should be the serialized DistinctTable -- so essentially it is a DataTable
-    // inside a DataTable
-    Preconditions.checkState(dataSchema.size() == 1,
-        "DataTable from server for DISTINCT should have exactly one row");
-    Preconditions.checkState(dataSchema.getColumnDataType(0) == ColumnDataType.OBJECT,
-        "DistinctAggregationFunction should return result of type OBJECT");
-    Object mergedIntermediateResult = null;
-    // go over all the data tables from servers
-    for (DataTable dataTable : dataTables) {
-      Preconditions.checkState(dataTable.getNumberOfRows() == 1);
-      // deserialize the DistinctTable
-      Object intermediateResultToMerge = dataTable.getObject(0, 0);
-      Preconditions.checkState(intermediateResultToMerge instanceof DistinctTable);
-      DistinctTable distinctTable = (DistinctTable)intermediateResultToMerge;
-      // since DistinctTable uses the Table interface and during deserialization, we didn't
-      // have all the necessary information w.r.t ORDER BY, limit etc, we set it now
-      // before merging so that resize/trimming/sorting happens correctly
-      distinctTable.addLimitAndOrderByInfo(brokerRequest);
-      if (mergedIntermediateResult == null) {
-        mergedIntermediateResult = intermediateResultToMerge;
-      } else {
-        aggregationFunction.merge(mergedIntermediateResult, intermediateResultToMerge);
-      }
-    }
-
-    DistinctTable distinctTable =  (DistinctTable)mergedIntermediateResult;
-    // finish the merging, sort (if ORDER BY), get iterator
-    distinctTable.finish(true);
-
-    List<Serializable[]> resultSet = new ArrayList<>(distinctTable.size());
-    String[] columnNames = distinctTable.getDataSchema().getColumnNames();
-    Iterator<Record> iterator = distinctTable.iterator();
-    while (iterator.hasNext()) {
-      Record record = iterator.next();
-      Object[] columns = record.getValues();
-      Serializable[] distinctRow = new Serializable[columns.length];
-      for (int col = 0; col < columns.length; col++) {
-        final Serializable columnValue = AggregationFunctionUtils.getSerializableValue(columns[col]);
-        distinctRow[col] = columnValue;
-      }
-      resultSet.add(distinctRow);
-    }
-
-    // Up until now, we have treated DISTINCT similar to another aggregation function even in terms
-    // of the result from function and merging results.
-    // However, the DISTINCT query is just another SELECTION style query from the user's point
-    // of view and will return one or records in the result table for the column(s) selected and so
-    // for that reason, response from broker should be a selection query result.
-    brokerResponseNative.setSelectionResults((new SelectionResults(Arrays.asList(columnNames), resultSet)));
-  }
-
-  /**
-   * Extract group by order by results and set into {@link ResultTable}
-   * @param brokerResponseNative broker response
-   * @param dataSchema data schema
-   * @param aggregationInfos aggregations info
-   * @param groupBy group by info
-   * @param orderBy order by info
-   * @param dataTables Collection of data tables
-   */
-  private void setSQLGroupByOrderByResults(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
-      List<AggregationInfo> aggregationInfos, GroupBy groupBy, List<SelectionSort> orderBy,
-      Collection<DataTable> dataTables) {
-    List<String> columnNames = new ArrayList<>(dataSchema.size());
-    for (int i = 0; i < dataSchema.size(); i++) {
-      columnNames.add(dataSchema.getColumnName(i));
-    }
-
-    int numGroupBy = groupBy.getExpressionsSize();
-    int numAggregations = aggregationInfos.size();
-
-    IndexedTable indexedTable = getIndexedTable(groupBy, aggregationInfos, orderBy, dataSchema, dataTables);
-
-    AggregationFunction[] aggregationFunctions = new AggregationFunction[numAggregations];
-    for (int i = 0; i < numAggregations; i++) {
-      aggregationFunctions[i] =
-          AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(i)).getAggregationFunction();
-    }
-
-    List<Object[]> rows = new ArrayList<>();
-    int numColumns = columnNames.size();
-    Iterator<Record> sortedIterator = indexedTable.iterator();
-    int numRows = 0;
-    while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
-
-      Record nextRecord = sortedIterator.next();
-      Object[] values = nextRecord.getValues();
-
-      int index = numGroupBy;
-      int aggNum = 0;
-      while (index < numColumns) {
-        values[index] = aggregationFunctions[aggNum++].extractFinalResult(values[index]);
-        index++;
-      }
-      rows.add(values);
-      numRows++;
-    }
-
-    brokerResponseNative.setResultTable(new ResultTable(dataSchema, rows));
-  }
-
-  private IndexedTable getIndexedTable(GroupBy groupBy, List<AggregationInfo> aggregationInfos,
-      List<SelectionSort> orderBy, DataSchema dataSchema, Collection<DataTable> dataTables) {
-
-    int numColumns = dataSchema.size();
-    int indexedTableCapacity = GroupByUtils.getTableCapacity(groupBy, orderBy);
-    IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, indexedTableCapacity);
-
-    for (DataTable dataTable : dataTables) {
-      BiFunction[] functions = new BiFunction[numColumns];
-      for (int i = 0; i < numColumns; i++) {
-        ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
-        BiFunction<Integer, Integer, Object> function;
-        switch (columnDataType) {
-
-          case INT:
-            function = dataTable::getInt;
-            break;
-          case LONG:
-            function = dataTable::getLong;
-            break;
-          case FLOAT:
-            function = dataTable::getFloat;
-            break;
-          case DOUBLE:
-            function = dataTable::getDouble;
-            break;
-          case STRING:
-            function = dataTable::getString;
-            break;
-          case BYTES:
-            // FIXME: support BYTES in DataTable instead of converting to string
-            function = (row, col) -> BytesUtils.toByteArray(dataTable.getString(row, col));
-            break;
-          default:
-            function = dataTable::getObject;
-        }
-        functions[i] = function;
-      }
-
-      for (int row = 0; row < dataTable.getNumberOfRows(); row++) {
-        Object[] columns = new Object[numColumns];
-        for (int col = 0; col < numColumns; col++) {
-          columns[col] = functions[col].apply(row, col);
-        }
-        Record record = new Record(columns);
-        indexedTable.upsert(record);
-      }
-    }
-    indexedTable.finish(true);
-    return indexedTable;
-  }
-
-  /**
-   * Extract the results of group by order by into a List of {@link AggregationResult}
-   * There will be 1 aggregation result per aggregation. The group by keys will be the same across all aggregations
-   * @param brokerResponseNative broker response
-   * @param dataSchema data schema
-   * @param aggregationInfos aggregations info
-   * @param groupBy group by info
-   * @param orderBy order by info
-   * @param dataTables Collection of data tables
-   */
-  private void setPQLGroupByOrderByResults(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
-      List<AggregationInfo> aggregationInfos, GroupBy groupBy, List<SelectionSort> orderBy,
-      Collection<DataTable> dataTables, boolean preserveType) {
-    int numGroupBy = groupBy.getExpressionsSize();
-    int numAggregations = aggregationInfos.size();
-    int numColumns = numGroupBy + numAggregations;
-
-    List<String> groupByColumns = new ArrayList<>(numGroupBy);
-    int idx = 0;
-    while (idx < numGroupBy) {
-      groupByColumns.add(dataSchema.getColumnName(idx));
-      idx++;
-    }
-
-    List<String> aggregationColumns = new ArrayList<>(numAggregations);
-    AggregationFunction[] aggregationFunctions = new AggregationFunction[aggregationInfos.size()];
-    List<List<GroupByResult>> groupByResults = new ArrayList<>(numAggregations);
-    int aggIdx = 0;
-    while (idx < numColumns) {
-      aggregationColumns.add(dataSchema.getColumnName(idx));
-      aggregationFunctions[aggIdx] =
-          AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(aggIdx)).getAggregationFunction();
-      groupByResults.add(new ArrayList<>());
-      idx++;
-      aggIdx++;
-    }
-
-    if (!dataTables.isEmpty()) {
-      IndexedTable indexedTable = getIndexedTable(groupBy, aggregationInfos, orderBy, dataSchema, dataTables);
-
-      Iterator<Record> sortedIterator = indexedTable.iterator();
-      int numRows = 0;
-      while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
-
-        Record nextRecord = sortedIterator.next();
-        Object[] values = nextRecord.getValues();
-
-        int index = 0;
-        List<String> group = new ArrayList<>(numGroupBy);
-        while (index < numGroupBy) {
-          group.add(values[index].toString());
-          index++;
-        }
-
-        int aggNum = 0;
-        while (index < numColumns) {
-          Serializable serializableValue =
-              getSerializableValue(aggregationFunctions[aggNum].extractFinalResult(values[index]));
-          if (!preserveType) {
-            serializableValue = AggregationFunctionUtils.formatValue(serializableValue);
-          }
-          GroupByResult groupByResult = new GroupByResult();
-          groupByResult.setGroup(group);
-          groupByResult.setValue(serializableValue);
-          groupByResults.get(aggNum).add(groupByResult);
-          index++;
-          aggNum++;
-        }
-        numRows++;
-      }
-    }
-
-    List<AggregationResult> aggregationResults = new ArrayList<>(numAggregations);
-    for (int i = 0; i < numAggregations; i++) {
-      AggregationResult aggregationResult =
-          new AggregationResult(groupByResults.get(i), groupByColumns, aggregationColumns.get(i));
-      aggregationResults.add(aggregationResult);
-    }
-    brokerResponseNative.setAggregationResults(aggregationResults);
-  }
-
-  private Serializable getSerializableValue(Object value) {
-    if (value instanceof Number) {
-      return (Number) value;
-    } else {
-      return value.toString();
-    }
-  }
-
-  /**
-   * Reduce group-by results from multiple servers and set them into BrokerResponseNative passed in.
-   *
-   * @param brokerResponseNative broker response.
-   * @param aggregationFunctions array of aggregation functions.
-   * @param groupBy group-by information.
-   * @param dataTables Collection of data tables
-   * @param havingFilterQuery having filter query
-   * @param havingFilterQueryMap having filter query map
-   */
-  @SuppressWarnings("unchecked")
-  private void setGroupByHavingResults(BrokerResponseNative brokerResponseNative,
-      AggregationFunction[] aggregationFunctions, boolean[] aggregationFunctionsSelectStatus, GroupBy groupBy,
-      Collection<DataTable> dataTables, HavingFilterQuery havingFilterQuery, HavingFilterQueryMap havingFilterQueryMap,
-      boolean preserveType) {
-    int numAggregationFunctions = aggregationFunctions.length;
-
-    // Merge results from all data tables.
-    String[] columnNames = new String[numAggregationFunctions];
-    Map<String, Object>[] intermediateResultMaps = new Map[numAggregationFunctions];
-    for (DataTable dataTable : dataTables) {
-      for (int i = 0; i < numAggregationFunctions; i++) {
-        if (columnNames[i] == null) {
-          columnNames[i] = dataTable.getString(i, 0);
-          intermediateResultMaps[i] = dataTable.getObject(i, 1);
-        } else {
-          Map<String, Object> mergedIntermediateResultMap = intermediateResultMaps[i];
-          Map<String, Object> intermediateResultMapToMerge = dataTable.getObject(i, 1);
-          for (Map.Entry<String, Object> entry : intermediateResultMapToMerge.entrySet()) {
-            String groupKey = entry.getKey();
-            Object intermediateResultToMerge = entry.getValue();
-            if (mergedIntermediateResultMap.containsKey(groupKey)) {
-              Object mergedIntermediateResult = mergedIntermediateResultMap.get(groupKey);
-              mergedIntermediateResultMap
-                  .put(groupKey, aggregationFunctions[i].merge(mergedIntermediateResult, intermediateResultToMerge));
-            } else {
-              mergedIntermediateResultMap.put(groupKey, intermediateResultToMerge);
-            }
-          }
-        }
-      }
-    }
-
-    // Extract final result maps from the merged intermediate result maps.
-    Map<String, Comparable>[] finalResultMaps = new Map[numAggregationFunctions];
-    for (int i = 0; i < numAggregationFunctions; i++) {
-      Map<String, Object> intermediateResultMap = intermediateResultMaps[i];
-      Map<String, Comparable> finalResultMap = new HashMap<>();
-      for (String groupKey : intermediateResultMap.keySet()) {
-        Object intermediateResult = intermediateResultMap.get(groupKey);
-        finalResultMap.put(groupKey, aggregationFunctions[i].extractFinalResult(intermediateResult));
-      }
-      finalResultMaps[i] = finalResultMap;
-    }
-    //If HAVING clause is set, we further filter the group by results based on the HAVING predicate
-    if (havingFilterQuery != null) {
-      HavingClauseComparisonTree havingClauseComparisonTree =
-          HavingClauseComparisonTree.buildHavingClauseComparisonTree(havingFilterQuery, havingFilterQueryMap);
-      //Applying close policy
-      //We just keep those groups (from different aggregation functions) that are exist in the result set of all aggregation functions.
-      //In other words, we just keep intersection of groups of different aggregation functions.
-      //Here we calculate the intersection of group key sets of different aggregation functions
-      Set<String> intersectionOfKeySets = finalResultMaps[0].keySet();
-      for (int i = 1; i < numAggregationFunctions; i++) {
-        intersectionOfKeySets.retainAll(finalResultMaps[i].keySet());
-      }
-
-      //Now it is time to remove those groups that do not validate HAVING clause predicate
-      //We use TreeMap which supports CASE_INSENSITIVE_ORDER
-      Map<String, Comparable> singleGroupAggResults = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
-      Map<String, Comparable>[] finalFilteredResultMaps = new Map[numAggregationFunctions];
-      for (int i = 0; i < numAggregationFunctions; i++) {
-        finalFilteredResultMaps[i] = new HashMap<>();
-      }
-
-      for (String groupKey : intersectionOfKeySets) {
-        for (int i = 0; i < numAggregationFunctions; i++) {
-          singleGroupAggResults.put(columnNames[i], finalResultMaps[i].get(groupKey));
-        }
-        //if this group validate HAVING predicate keep it in the new map
-        if (havingClauseComparisonTree.isThisGroupPassPredicates(singleGroupAggResults)) {
-          for (int i = 0; i < numAggregationFunctions; i++) {
-            finalFilteredResultMaps[i].put(groupKey, singleGroupAggResults.get(columnNames[i]));
-          }
-        }
-      }
-      //update the final results
-      finalResultMaps = finalFilteredResultMaps;
-    }
-
-    int aggregationNumsInFinalResult = 0;
-    for (int i = 0; i < numAggregationFunctions; i++) {
-      if (aggregationFunctionsSelectStatus[i]) {
-        aggregationNumsInFinalResult++;
-      }
-    }
-
-    if (aggregationNumsInFinalResult > 0) {
-      String[] finalColumnNames = new String[aggregationNumsInFinalResult];
-      Map<String, Comparable>[] finalOutResultMaps = new Map[aggregationNumsInFinalResult];
-      AggregationFunction[] finalAggregationFunctions = new AggregationFunction[aggregationNumsInFinalResult];
-      int count = 0;
-      for (int i = 0; i < numAggregationFunctions; i++) {
-        if (aggregationFunctionsSelectStatus[i]) {
-          finalColumnNames[count] = columnNames[i];
-          finalOutResultMaps[count] = finalResultMaps[i];
-          finalAggregationFunctions[count] = aggregationFunctions[i];
-          count++;
-        }
-      }
-      // Trim the final result maps to topN and set them into the broker response.
-      AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
-          new AggregationGroupByTrimmingService(finalAggregationFunctions, (int) groupBy.getTopN());
-      List<GroupByResult>[] groupByResultLists = aggregationGroupByTrimmingService.trimFinalResults(finalOutResultMaps);
-
-      // Format the value into string if required
-      if (!preserveType) {
-        for (List<GroupByResult> groupByResultList : groupByResultLists) {
-          for (GroupByResult groupByResult : groupByResultList) {
-            groupByResult.setValue(AggregationFunctionUtils.formatValue(groupByResult.getValue()));
-          }
-        }
-      }
-
-      List<AggregationResult> aggregationResults = new ArrayList<>(count);
-      for (int i = 0; i < aggregationNumsInFinalResult; i++) {
-        List<GroupByResult> groupByResultList = groupByResultLists[i];
-        aggregationResults.add(new AggregationResult(groupByResultList, groupBy.getExpressions(), finalColumnNames[i]));
-      }
-      brokerResponseNative.setAggregationResults(aggregationResults);
-    } else {
-      throw new IllegalStateException(
-          "There should be minimum one aggregation function in the select list of a Group by query");
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
new file mode 100644
index 0000000..43ba802
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DataTableReducer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.Map;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+
+
+/**
+ * Interface for data tables reducers of query results
+ */
+public interface DataTableReducer {
+
+  /**
+   * Reduces data tables and sets the results of the query into the BrokerResponseNative
+   * @param tableName table name
+   * @param dataSchema schema from broker reduce service
+   * @param dataTableMap map of servers to data tables
+   * @param brokerResponseNative broker response
+   * @param brokerMetrics broker metrics
+   */
+  void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
+      BrokerMetrics brokerMetrics);
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
new file mode 100644
index 0000000..45ee7c1
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/DistinctDataTableReducer.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.SelectionResults;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.aggregation.DistinctTable;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.QueryOptions;
+import org.apache.pinot.pql.parsers.pql2.ast.FunctionCallAstNode;
+
+
+/**
+ * Helper class to reduce data tables and set results of distinct query into the BrokerResponseNative
+ */
+public class DistinctDataTableReducer implements DataTableReducer {
+
+  private final BrokerRequest _brokerRequest;
+  private final AggregationFunction _aggregationFunction;
+
+  DistinctDataTableReducer(BrokerRequest brokerRequest, AggregationFunction aggregationFunction,
+      QueryOptions queryOptions) {
+    _brokerRequest = brokerRequest;
+    _aggregationFunction = aggregationFunction;
+  }
+
+  /**
+   * Reduces and sets results of distinct into SelectionResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema, Map<ServerRoutingInstance, DataTable> dataTableMap,
+      BrokerResponseNative brokerResponseNative, BrokerMetrics brokerMetrics) {
+
+    if (dataTableMap.isEmpty()) {
+      brokerResponseNative
+          .setSelectionResults(new SelectionResults(Arrays.asList(getDistinctColumns()), new ArrayList<>(0)));
+      return;
+    }
+
+    assert dataSchema != null;
+    // DISTINCT is implemented as an aggregation function in the execution engine. Just like
+    // other aggregation functions, DISTINCT returns its result as a single object
+    // (of type DistinctTable) serialized by the server into the DataTable and deserialized
+    // by the broker from the DataTable. So there should be exactly 1 row and 1 column and that
+    // column value should be the serialized DistinctTable -- so essentially it is a DataTable
+    // inside a DataTable
+    Collection<DataTable> dataTables = dataTableMap.values();
+    Preconditions.checkState(dataSchema.size() == 1, "DataTable from server for DISTINCT should have exactly one row");
+    Preconditions.checkState(dataSchema.getColumnDataType(0) == DataSchema.ColumnDataType.OBJECT,
+        "DistinctAggregationFunction should return result of type OBJECT");
+    Object mergedIntermediateResult = null;
+    // go over all the data tables from servers
+    for (DataTable dataTable : dataTables) {
+      Preconditions.checkState(dataTable.getNumberOfRows() == 1);
+      // deserialize the DistinctTable
+      Object intermediateResultToMerge = dataTable.getObject(0, 0);
+      Preconditions.checkState(intermediateResultToMerge instanceof DistinctTable);
+      DistinctTable distinctTable = (DistinctTable) intermediateResultToMerge;
+      // since DistinctTable uses the Table interface and during deserialization, we didn't
+      // have all the necessary information w.r.t ORDER BY, limit etc, we set it now
+      // before merging so that resize/trimming/sorting happens correctly
+      distinctTable.addLimitAndOrderByInfo(_brokerRequest);
+      if (mergedIntermediateResult == null) {
+        mergedIntermediateResult = intermediateResultToMerge;
+      } else {
+        _aggregationFunction.merge(mergedIntermediateResult, intermediateResultToMerge);
+      }
+    }
+
+    DistinctTable distinctTable = (DistinctTable) mergedIntermediateResult;
+    // finish the merging, sort (if ORDER BY), get iterator
+    distinctTable.finish(true);
+
+    List<Serializable[]> resultSet = new ArrayList<>(distinctTable.size());
+    String[] columnNames = distinctTable.getDataSchema().getColumnNames();
+    Iterator<Record> iterator = distinctTable.iterator();
+    while (iterator.hasNext()) {
+      Record record = iterator.next();
+      Object[] columns = record.getValues();
+      Serializable[] distinctRow = new Serializable[columns.length];
+      for (int col = 0; col < columns.length; col++) {
+        final Serializable columnValue = AggregationFunctionUtils.getSerializableValue(columns[col]);
+        distinctRow[col] = columnValue;
+      }
+      resultSet.add(distinctRow);
+    }
+
+    // Up until now, we have treated DISTINCT similar to another aggregation function even in terms
+    // of the result from function and merging results.
+    // However, the DISTINCT query is just another SELECTION style query from the user's point
+    // of view and will return one or records in the result table for the column(s) selected and so
+    // for that reason, response from broker should be a selection query result.
+    brokerResponseNative.setSelectionResults((new SelectionResults(Arrays.asList(columnNames), resultSet)));
+  }
+
+  private String[] getDistinctColumns() {
+    return _brokerRequest.getAggregationsInfo().get(0).getAggregationParams()
+        .get(FunctionCallAstNode.COLUMN_KEY_IN_AGGREGATION_INFO)
+        .split(FunctionCallAstNode.DISTINCT_MULTI_COLUMN_SEPARATOR);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
new file mode 100644
index 0000000..6f47faa
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -0,0 +1,438 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.function.BiFunction;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.AggregationInfo;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.GroupBy;
+import org.apache.pinot.common.request.HavingFilterQuery;
+import org.apache.pinot.common.request.HavingFilterQueryMap;
+import org.apache.pinot.common.request.SelectionSort;
+import org.apache.pinot.common.response.broker.AggregationResult;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.GroupByResult;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.BytesUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.query.aggregation.groupby.AggregationGroupByTrimmingService;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.QueryOptions;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the BrokerResponseNative
+ */
+public class GroupByDataTableReducer implements DataTableReducer {
+
+  private final BrokerRequest _brokerRequest;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final boolean _preserveType;
+  private final boolean _groupByModeSql;
+  private final boolean _responseFormatSql;
+
+  GroupByDataTableReducer(BrokerRequest brokerRequest, AggregationFunction[] aggregationFunctions,
+      QueryOptions queryOptions) {
+    _brokerRequest = brokerRequest;
+    _aggregationFunctions = aggregationFunctions;
+    _preserveType = queryOptions.isPreserveType();
+    _groupByModeSql = queryOptions.isGroupByModeSQL();
+    _responseFormatSql = queryOptions.isResponseFormatSQL();
+  }
+
+  /**
+   * Reduces and sets group by results into ResultTable, if responseFormat = sql
+   * By default, sets group by results into GroupByResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema, Map<ServerRoutingInstance, DataTable> dataTableMap,
+      BrokerResponseNative brokerResponseNative, BrokerMetrics brokerMetrics) {
+    if (dataTableMap.isEmpty() && !_responseFormatSql) {
+      return;
+    }
+
+    assert dataSchema != null;
+    int resultSize = 0;
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    // Aggregation group-by query.
+    // read results as records if  GROUP_BY_MODE is explicitly set to SQL
+
+    if (_groupByModeSql) {
+
+      // if RESPONSE_FORMAT is SQL, return results in {@link ResultTable}
+      if (_responseFormatSql) {
+        setSQLGroupByOrderByResults(brokerResponseNative, dataSchema, _brokerRequest.getAggregationsInfo(),
+            _brokerRequest.getGroupBy(), _brokerRequest.getOrderBy(), dataTables);
+        resultSize = brokerResponseNative.getResultTable().getRows().size();
+      } else {
+        setPQLGroupByOrderByResults(brokerResponseNative, dataSchema, _brokerRequest.getAggregationsInfo(),
+            _brokerRequest.getGroupBy(), _brokerRequest.getOrderBy(), dataTables);
+        if (!brokerResponseNative.getAggregationResults().isEmpty()) {
+          resultSize = brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size();
+        }
+      }
+    } else {
+
+      boolean[] aggregationFunctionSelectStatus =
+          AggregationFunctionUtils.getAggregationFunctionsSelectStatus(_brokerRequest.getAggregationsInfo());
+      setGroupByHavingResults(brokerResponseNative, aggregationFunctionSelectStatus, _brokerRequest.getGroupBy(),
+          dataTables, _brokerRequest.getHavingFilterQuery(), _brokerRequest.getHavingFilterSubQueryMap());
+      // We emit the group by size when the result isn't empty. All the sizes among group-by results should be the same.
+      // Thus, we can just emit the one from the 1st result.
+      if (!brokerResponseNative.getAggregationResults().isEmpty()) {
+        resultSize = brokerResponseNative.getAggregationResults().get(0).getGroupByResult().size();
+      }
+    }
+    if (brokerMetrics != null && resultSize > 0) {
+      brokerMetrics.addMeteredQueryValue(_brokerRequest, BrokerMeter.GROUP_BY_SIZE, resultSize);
+    }
+  }
+
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTables Collection of data tables
+   */
+  private void setSQLGroupByOrderByResults(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+      List<AggregationInfo> aggregationInfos, GroupBy groupBy, List<SelectionSort> orderBy,
+      Collection<DataTable> dataTables) {
+
+    IndexedTable indexedTable = getIndexedTable(groupBy, aggregationInfos, orderBy, dataSchema, dataTables);
+
+    List<Object[]> rows = new ArrayList<>();
+    int numColumns = dataSchema.size();
+    int numGroupBy = groupBy.getExpressionsSize();
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    int numRows = 0;
+    while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
+
+      Record nextRecord = sortedIterator.next();
+      Object[] values = nextRecord.getValues();
+
+      int index = numGroupBy;
+      int aggNum = 0;
+      while (index < numColumns) {
+        values[index] = _aggregationFunctions[aggNum++].extractFinalResult(values[index]);
+        index++;
+      }
+      rows.add(values);
+      numRows++;
+    }
+
+    brokerResponseNative.setResultTable(new ResultTable(dataSchema, rows));
+  }
+
+  private IndexedTable getIndexedTable(GroupBy groupBy, List<AggregationInfo> aggregationInfos,
+      List<SelectionSort> orderBy, DataSchema dataSchema, Collection<DataTable> dataTables) {
+
+    int numColumns = dataSchema.size();
+    int indexedTableCapacity = GroupByUtils.getTableCapacity(groupBy, orderBy);
+    IndexedTable indexedTable = new ConcurrentIndexedTable(dataSchema, aggregationInfos, orderBy, indexedTableCapacity);
+
+    for (DataTable dataTable : dataTables) {
+      BiFunction[] functions = new BiFunction[numColumns];
+      for (int i = 0; i < numColumns; i++) {
+        DataSchema.ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+        BiFunction<Integer, Integer, Object> function;
+        switch (columnDataType) {
+
+          case INT:
+            function = dataTable::getInt;
+            break;
+          case LONG:
+            function = dataTable::getLong;
+            break;
+          case FLOAT:
+            function = dataTable::getFloat;
+            break;
+          case DOUBLE:
+            function = dataTable::getDouble;
+            break;
+          case STRING:
+            function = dataTable::getString;
+            break;
+          case BYTES:
+            // FIXME: support BYTES in DataTable instead of converting to string
+            function = (row, col) -> BytesUtils.toByteArray(dataTable.getString(row, col));
+            break;
+          default:
+            function = dataTable::getObject;
+        }
+        functions[i] = function;
+      }
+
+      for (int row = 0; row < dataTable.getNumberOfRows(); row++) {
+        Object[] columns = new Object[numColumns];
+        for (int col = 0; col < numColumns; col++) {
+          columns[col] = functions[col].apply(row, col);
+        }
+        Record record = new Record(columns);
+        indexedTable.upsert(record);
+      }
+    }
+    indexedTable.finish(true);
+    return indexedTable;
+  }
+
+  /**
+   * Extract the results of group by order by into a List of {@link AggregationResult}
+   * There will be 1 aggregation result per aggregation. The group by keys will be the same across all aggregations
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param aggregationInfos aggregations info
+   * @param groupBy group by info
+   * @param orderBy order by info
+   * @param dataTables Collection of data tables
+   */
+  private void setPQLGroupByOrderByResults(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+      List<AggregationInfo> aggregationInfos, GroupBy groupBy, List<SelectionSort> orderBy,
+      Collection<DataTable> dataTables) {
+    int numGroupBy = groupBy.getExpressionsSize();
+    int numAggregations = aggregationInfos.size();
+    int numColumns = numGroupBy + numAggregations;
+
+    List<String> groupByColumns = new ArrayList<>(numGroupBy);
+    int idx = 0;
+    while (idx < numGroupBy) {
+      groupByColumns.add(dataSchema.getColumnName(idx));
+      idx++;
+    }
+
+    List<String> aggregationColumns = new ArrayList<>(numAggregations);
+    AggregationFunction[] aggregationFunctions = new AggregationFunction[aggregationInfos.size()];
+    List<List<GroupByResult>> groupByResults = new ArrayList<>(numAggregations);
+    int aggIdx = 0;
+    while (idx < numColumns) {
+      aggregationColumns.add(dataSchema.getColumnName(idx));
+      aggregationFunctions[aggIdx] =
+          AggregationFunctionUtils.getAggregationFunctionContext(aggregationInfos.get(aggIdx)).getAggregationFunction();
+      groupByResults.add(new ArrayList<>());
+      idx++;
+      aggIdx++;
+    }
+
+    if (!dataTables.isEmpty()) {
+      IndexedTable indexedTable = getIndexedTable(groupBy, aggregationInfos, orderBy, dataSchema, dataTables);
+
+      Iterator<Record> sortedIterator = indexedTable.iterator();
+      int numRows = 0;
+      while (numRows < groupBy.getTopN() && sortedIterator.hasNext()) {
+
+        Record nextRecord = sortedIterator.next();
+        Object[] values = nextRecord.getValues();
+
+        int index = 0;
+        List<String> group = new ArrayList<>(numGroupBy);
+        while (index < numGroupBy) {
+          group.add(values[index].toString());
+          index++;
+        }
+
+        int aggNum = 0;
+        while (index < numColumns) {
+          Serializable serializableValue =
+              getSerializableValue(aggregationFunctions[aggNum].extractFinalResult(values[index]));
+          if (!_preserveType) {
+            serializableValue = AggregationFunctionUtils.formatValue(serializableValue);
+          }
+          GroupByResult groupByResult = new GroupByResult();
+          groupByResult.setGroup(group);
+          groupByResult.setValue(serializableValue);
+          groupByResults.get(aggNum).add(groupByResult);
+          index++;
+          aggNum++;
+        }
+        numRows++;
+      }
+    }
+
+    List<AggregationResult> aggregationResults = new ArrayList<>(numAggregations);
+    for (int i = 0; i < numAggregations; i++) {
+      AggregationResult aggregationResult =
+          new AggregationResult(groupByResults.get(i), groupByColumns, aggregationColumns.get(i));
+      aggregationResults.add(aggregationResult);
+    }
+    brokerResponseNative.setAggregationResults(aggregationResults);
+  }
+
+  private Serializable getSerializableValue(Object value) {
+    if (value instanceof Number) {
+      return (Number) value;
+    } else {
+      return value.toString();
+    }
+  }
+
+  /**
+   * Reduce group-by results from multiple servers and set them into BrokerResponseNative passed in.
+   *
+   * @param brokerResponseNative broker response.
+   * @param groupBy group-by information.
+   * @param dataTables Collection of data tables
+   * @param havingFilterQuery having filter query
+   * @param havingFilterQueryMap having filter query map
+   */
+  @SuppressWarnings("unchecked")
+  private void setGroupByHavingResults(BrokerResponseNative brokerResponseNative,
+      boolean[] aggregationFunctionsSelectStatus, GroupBy groupBy, Collection<DataTable> dataTables,
+      HavingFilterQuery havingFilterQuery, HavingFilterQueryMap havingFilterQueryMap) {
+    int numAggregationFunctions = _aggregationFunctions.length;
+
+    // Merge results from all data tables.
+    String[] columnNames = new String[numAggregationFunctions];
+    Map<String, Object>[] intermediateResultMaps = new Map[numAggregationFunctions];
+    for (DataTable dataTable : dataTables) {
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        if (columnNames[i] == null) {
+          columnNames[i] = dataTable.getString(i, 0);
+          intermediateResultMaps[i] = dataTable.getObject(i, 1);
+        } else {
+          Map<String, Object> mergedIntermediateResultMap = intermediateResultMaps[i];
+          Map<String, Object> intermediateResultMapToMerge = dataTable.getObject(i, 1);
+          for (Map.Entry<String, Object> entry : intermediateResultMapToMerge.entrySet()) {
+            String groupKey = entry.getKey();
+            Object intermediateResultToMerge = entry.getValue();
+            if (mergedIntermediateResultMap.containsKey(groupKey)) {
+              Object mergedIntermediateResult = mergedIntermediateResultMap.get(groupKey);
+              mergedIntermediateResultMap
+                  .put(groupKey, _aggregationFunctions[i].merge(mergedIntermediateResult, intermediateResultToMerge));
+            } else {
+              mergedIntermediateResultMap.put(groupKey, intermediateResultToMerge);
+            }
+          }
+        }
+      }
+    }
+
+    // Extract final result maps from the merged intermediate result maps.
+    Map<String, Comparable>[] finalResultMaps = new Map[numAggregationFunctions];
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      Map<String, Object> intermediateResultMap = intermediateResultMaps[i];
+      Map<String, Comparable> finalResultMap = new HashMap<>();
+      for (String groupKey : intermediateResultMap.keySet()) {
+        Object intermediateResult = intermediateResultMap.get(groupKey);
+        finalResultMap.put(groupKey, _aggregationFunctions[i].extractFinalResult(intermediateResult));
+      }
+      finalResultMaps[i] = finalResultMap;
+    }
+    //If HAVING clause is set, we further filter the group by results based on the HAVING predicate
+    if (havingFilterQuery != null) {
+      HavingClauseComparisonTree havingClauseComparisonTree =
+          HavingClauseComparisonTree.buildHavingClauseComparisonTree(havingFilterQuery, havingFilterQueryMap);
+      //Applying close policy
+      //We just keep those groups (from different aggregation functions) that are exist in the result set of all aggregation functions.
+      //In other words, we just keep intersection of groups of different aggregation functions.
+      //Here we calculate the intersection of group key sets of different aggregation functions
+      Set<String> intersectionOfKeySets = finalResultMaps[0].keySet();
+      for (int i = 1; i < numAggregationFunctions; i++) {
+        intersectionOfKeySets.retainAll(finalResultMaps[i].keySet());
+      }
+
+      //Now it is time to remove those groups that do not validate HAVING clause predicate
+      //We use TreeMap which supports CASE_INSENSITIVE_ORDER
+      Map<String, Comparable> singleGroupAggResults = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
+      Map<String, Comparable>[] finalFilteredResultMaps = new Map[numAggregationFunctions];
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        finalFilteredResultMaps[i] = new HashMap<>();
+      }
+
+      for (String groupKey : intersectionOfKeySets) {
+        for (int i = 0; i < numAggregationFunctions; i++) {
+          singleGroupAggResults.put(columnNames[i], finalResultMaps[i].get(groupKey));
+        }
+        //if this group validate HAVING predicate keep it in the new map
+        if (havingClauseComparisonTree.isThisGroupPassPredicates(singleGroupAggResults)) {
+          for (int i = 0; i < numAggregationFunctions; i++) {
+            finalFilteredResultMaps[i].put(groupKey, singleGroupAggResults.get(columnNames[i]));
+          }
+        }
+      }
+      //update the final results
+      finalResultMaps = finalFilteredResultMaps;
+    }
+
+    int aggregationNumsInFinalResult = 0;
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      if (aggregationFunctionsSelectStatus[i]) {
+        aggregationNumsInFinalResult++;
+      }
+    }
+
+    if (aggregationNumsInFinalResult > 0) {
+      String[] finalColumnNames = new String[aggregationNumsInFinalResult];
+      Map<String, Comparable>[] finalOutResultMaps = new Map[aggregationNumsInFinalResult];
+      AggregationFunction[] finalAggregationFunctions = new AggregationFunction[aggregationNumsInFinalResult];
+      int count = 0;
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        if (aggregationFunctionsSelectStatus[i]) {
+          finalColumnNames[count] = columnNames[i];
+          finalOutResultMaps[count] = finalResultMaps[i];
+          finalAggregationFunctions[count] = _aggregationFunctions[i];
+          count++;
+        }
+      }
+      // Trim the final result maps to topN and set them into the broker response.
+      AggregationGroupByTrimmingService aggregationGroupByTrimmingService =
+          new AggregationGroupByTrimmingService(finalAggregationFunctions, (int) groupBy.getTopN());
+      List<GroupByResult>[] groupByResultLists = aggregationGroupByTrimmingService.trimFinalResults(finalOutResultMaps);
+
+      // Format the value into string if required
+      if (!_preserveType) {
+        for (List<GroupByResult> groupByResultList : groupByResultLists) {
+          for (GroupByResult groupByResult : groupByResultList) {
+            groupByResult.setValue(AggregationFunctionUtils.formatValue(groupByResult.getValue()));
+          }
+        }
+      }
+
+      List<AggregationResult> aggregationResults = new ArrayList<>(count);
+      for (int i = 0; i < aggregationNumsInFinalResult; i++) {
+        List<GroupByResult> groupByResultList = groupByResultLists[i];
+        aggregationResults.add(new AggregationResult(groupByResultList, groupBy.getExpressions(), finalColumnNames[i]));
+      }
+      brokerResponseNative.setAggregationResults(aggregationResults);
+    } else {
+      throw new IllegalStateException(
+          "There should be minimum one aggregation function in the select list of a Group by query");
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
new file mode 100644
index 0000000..c19a673
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import org.apache.pinot.common.function.AggregationFunctionType;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
+import org.apache.pinot.core.util.QueryOptions;
+
+
+/**
+ * Factory class to construct the right result reducer based on the broker request
+ */
+public final class ResultReducerFactory {
+
+  /**
+   * Constructs the right result reducer based on the broker request
+   */
+  public static DataTableReducer getResultReducer(BrokerRequest brokerRequest) {
+    DataTableReducer dataTableReducer;
+    QueryOptions queryOptions = new QueryOptions(brokerRequest.getQueryOptions());
+    if (brokerRequest.getSelections() != null) {
+      // Selection query
+      dataTableReducer = new SelectionDataTableReducer(brokerRequest, queryOptions);
+    } else {
+      // Aggregation query
+      AggregationFunction[] aggregationFunctions = AggregationFunctionUtils.getAggregationFunctions(brokerRequest);
+      if (!brokerRequest.isSetGroupBy()) {
+        // Aggregation only query
+        if (aggregationFunctions.length == 1 && aggregationFunctions[0].getType() == AggregationFunctionType.DISTINCT) {
+          // Distinct query
+          dataTableReducer = new DistinctDataTableReducer(brokerRequest, aggregationFunctions[0], queryOptions);
+        } else {
+          dataTableReducer = new AggregationDataTableReducer(brokerRequest, aggregationFunctions, queryOptions);
+        }
+      } else {
+        // Aggregation group-by query
+        dataTableReducer = new GroupByDataTableReducer(brokerRequest, aggregationFunctions, queryOptions);
+      }
+    }
+    return dataTableReducer;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
new file mode 100644
index 0000000..3891d35
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/SelectionDataTableReducer.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.config.TableNameBuilder;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.request.Selection;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.SelectionResults;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.query.selection.SelectionOperatorService;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.QueryOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Helper class to reduce and set Selection results into the BrokerResponseNative
+ */
+public class SelectionDataTableReducer implements DataTableReducer {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(SelectionDataTableReducer.class);
+
+  private final Selection _selection;
+  private boolean _preserveType;
+
+  SelectionDataTableReducer(BrokerRequest brokerRequest, QueryOptions queryOptions) {
+    _selection = brokerRequest.getSelections();
+    _preserveType = queryOptions.isPreserveType();
+  }
+
+  /**
+   * Reduces data tables and sets selection results into BrokerResponseNative::SelectionResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema, Map<ServerRoutingInstance, DataTable> dataTableMap,
+      BrokerResponseNative brokerResponseNative, BrokerMetrics brokerMetrics) {
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    if (dataTableMap.isEmpty()) {
+      // For empty data table map, construct empty result using the cached data schema for selection query if exists
+      if (dataSchema != null) {
+        List<String> selectionColumns =
+            SelectionOperatorUtils.getSelectionColumns(_selection.getSelectionColumns(), dataSchema);
+        brokerResponseNative.setSelectionResults(new SelectionResults(selectionColumns, new ArrayList<>(0)));
+      }
+    } else {
+
+      assert dataSchema != null;
+
+      // For data table map with more than one data tables, remove conflicting data tables
+      if (dataTableMap.size() > 1) {
+        List<ServerRoutingInstance> droppedServers = removeConflictingResponses(dataSchema, dataTableMap);
+        if (!droppedServers.isEmpty()) {
+          String errorMessage = QueryException.MERGE_RESPONSE_ERROR.getMessage() + ": responses for table: " + tableName
+              + " from servers: " + droppedServers + " got dropped due to data schema inconsistency.";
+          LOGGER.warn(errorMessage);
+          if (brokerMetrics != null) {
+            brokerMetrics.addMeteredTableValue(TableNameBuilder.extractRawTableName(tableName),
+                BrokerMeter.RESPONSE_MERGE_EXCEPTIONS, 1L);
+          }
+          brokerResponseNative
+              .addToExceptions(new QueryProcessingException(QueryException.MERGE_RESPONSE_ERROR_CODE, errorMessage));
+        }
+      }
+
+      int selectionSize = _selection.getSize();
+      if (selectionSize > 0 && _selection.isSetSelectionSortSequence()) {
+        // Selection order-by
+        SelectionOperatorService selectionService = new SelectionOperatorService(_selection, dataSchema);
+        selectionService.reduceWithOrdering(dataTables);
+        brokerResponseNative.setSelectionResults(selectionService.renderSelectionResultsWithOrdering(_preserveType));
+      } else {
+        // Selection only
+        List<String> selectionColumns =
+            SelectionOperatorUtils.getSelectionColumns(_selection.getSelectionColumns(), dataSchema);
+        brokerResponseNative.setSelectionResults(SelectionOperatorUtils.renderSelectionResultsWithoutOrdering(
+            SelectionOperatorUtils.reduceWithoutOrdering(dataTables, selectionSize), dataSchema, selectionColumns,
+            _preserveType));
+      }
+    }
+  }
+
+  /**
+   * Given a data schema, remove data tables that are not compatible with this data schema.
+   * <p>Upgrade the data schema passed in to cover all remaining data schemas.
+   *
+   * @param dataSchema data schema.
+   * @param dataTableMap map from server to data table.
+   * @return list of server names where the data table got removed.
+   */
+  private List<ServerRoutingInstance> removeConflictingResponses(DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap) {
+    List<ServerRoutingInstance> droppedServers = new ArrayList<>();
+    Iterator<Map.Entry<ServerRoutingInstance, DataTable>> iterator = dataTableMap.entrySet().iterator();
+    while (iterator.hasNext()) {
+      Map.Entry<ServerRoutingInstance, DataTable> entry = iterator.next();
+      DataSchema dataSchemaToCompare = entry.getValue().getDataSchema();
+      assert dataSchemaToCompare != null;
+      if (!dataSchema.isTypeCompatibleWith(dataSchemaToCompare)) {
+        droppedServers.add(entry.getKey());
+        iterator.remove();
+      } else {
+        dataSchema.upgradeToCover(dataSchemaToCompare);
+      }
+    }
+    return droppedServers;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org