You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/07 08:56:31 UTC

[pinot] branch master updated: Allow server to directly return the final aggregation result (#9304)

This is an automated email from the ASF dual-hosted git repository.

siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5ca2ed4cbe Allow server to directly return the final aggregation result (#9304)
5ca2ed4cbe is described below

commit 5ca2ed4cbe16f33f5404824f4bcfdcc3e0b54f2e
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Sep 7 01:56:22 2022 -0700

    Allow server to directly return the final aggregation result (#9304)
---
 .../apache/pinot/core/data/table/IndexedTable.java |  20 ++-
 .../org/apache/pinot/core/data/table/Table.java    |   7 +-
 .../blocks/results/AggregationResultsBlock.java    |  80 ++++++++--
 .../blocks/results/GroupByResultsBlock.java        |  15 +-
 .../combine/GroupByOrderByCombineOperator.java     |   9 +-
 .../function/AggregationFunctionUtils.java         |  46 ++++++
 .../query/reduce/AggregationDataTableReducer.java  |  69 ++++----
 .../core/query/reduce/GroupByDataTableReducer.java | 177 ++++++++++++++++-----
 .../core/query/request/context/QueryContext.java   |  11 ++
 .../apache/pinot/core/util/QueryOptionsUtils.java  |  31 ++--
 .../tests/OfflineClusterIntegrationTest.java       |  73 ++++-----
 .../apache/pinot/spi/utils/CommonConstants.java    |   1 +
 12 files changed, 397 insertions(+), 142 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index dbd4e99891..012fdc1170 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.pinot.common.request.context.ExpressionContext;
 import org.apache.pinot.common.request.context.OrderByExpressionContext;
 import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
 
@@ -54,7 +55,7 @@ public abstract class IndexedTable extends BaseTable {
    *
    * @param dataSchema    Data schema of the table
    * @param queryContext  Query context
-   * @param resultSize    Number of records to keep in the final result after calling {@link #finish(boolean)}
+   * @param resultSize    Number of records to keep in the final result after calling {@link #finish(boolean, boolean)}
    * @param trimSize      Number of records to keep when trimming the table
    * @param trimThreshold Trim the table when the number of records exceeds the threshold
    * @param lookupMap     Map from keys to records
@@ -144,7 +145,7 @@ public abstract class IndexedTable extends BaseTable {
   }
 
   @Override
-  public void finish(boolean sort) {
+  public void finish(boolean sort, boolean storeFinalResult) {
     if (_hasOrderBy) {
       long startTimeNs = System.nanoTime();
       _topRecords = _tableResizer.getTopRecords(_lookupMap, _resultSize, sort);
@@ -154,6 +155,21 @@ public abstract class IndexedTable extends BaseTable {
     } else {
       _topRecords = _lookupMap.values();
     }
+    // TODO: Directly return final result in _tableResizer.getTopRecords to avoid extracting final result multiple times
+    if (storeFinalResult) {
+      ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
+      int numAggregationFunctions = _aggregationFunctions.length;
+      for (int i = 0; i < numAggregationFunctions; i++) {
+        columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
+      }
+      for (Record record : _topRecords) {
+        Object[] values = record.getValues();
+        for (int i = 0; i < numAggregationFunctions; i++) {
+          int colId = i + _numKeyColumns;
+          values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
+        }
+      }
+    }
   }
 
   @Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
index d491b119f3..ab682e5dc0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
@@ -55,11 +55,16 @@ public interface Table {
    */
   Iterator<Record> iterator();
 
+  default void finish(boolean sort) {
+    finish(sort, false);
+  }
+
   /**
    * Finish any pre exit processing
    * @param sort sort the final results if true
+   * @param storeFinalResult whether to store final aggregation result
    */
-  void finish(boolean sort);
+  void finish(boolean sort, boolean storeFinalResult);
 
   /**
    * Returns the data schema of the table
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index e4d0ba6bca..bba0c6df61 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pinot.core.operator.blocks.results;
 
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.util.List;
 import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -27,6 +29,7 @@ import org.apache.pinot.core.common.datatable.DataTableBuilder;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.utils.ByteArray;
 import org.apache.pinot.spi.utils.NullValueUtils;
 import org.roaringbitmap.RoaringBitmap;
 
@@ -34,7 +37,7 @@ import org.roaringbitmap.RoaringBitmap;
 /**
  * Results block for aggregation queries.
  */
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
 public class AggregationResultsBlock extends BaseResultsBlock {
   private final AggregationFunction[] _aggregationFunctions;
   private final List<Object> _results;
@@ -55,6 +58,8 @@ public class AggregationResultsBlock extends BaseResultsBlock {
   @Override
   public DataTable getDataTable(QueryContext queryContext)
       throws Exception {
+    boolean returnFinalResult = queryContext.isServerReturnFinalResult();
+
     // Extract result column name and type from each aggregation function
     int numColumns = _aggregationFunctions.length;
     String[] columnNames = new String[numColumns];
@@ -62,7 +67,8 @@ public class AggregationResultsBlock extends BaseResultsBlock {
     for (int i = 0; i < numColumns; i++) {
       AggregationFunction aggregationFunction = _aggregationFunctions[i];
       columnNames[i] = aggregationFunction.getColumnName();
-      columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType();
+      columnDataTypes[i] = returnFinalResult ? aggregationFunction.getFinalResultColumnType()
+          : aggregationFunction.getIntermediateResultColumnType();
     }
 
     // Build the data table.
@@ -76,11 +82,20 @@ public class AggregationResultsBlock extends BaseResultsBlock {
       dataTableBuilder.startRow();
       for (int i = 0; i < numColumns; i++) {
         Object result = _results.get(i);
-        if (result == null && columnDataTypes[i] != ColumnDataType.OBJECT) {
-          result = NullValueUtils.getDefaultNullValue(columnDataTypes[i].toDataType());
-          nullBitmaps[i].add(0);
+        if (!returnFinalResult) {
+          if (result == null && columnDataTypes[i] != ColumnDataType.OBJECT) {
+            result = NullValueUtils.getDefaultNullValue(columnDataTypes[i].toDataType());
+            nullBitmaps[i].add(0);
+          }
+          setIntermediateResult(dataTableBuilder, columnDataTypes, i, result);
+        } else {
+          result = _aggregationFunctions[i].extractFinalResult(result);
+          if (result == null) {
+            result = NullValueUtils.getDefaultNullValue(columnDataTypes[i].toDataType());
+            nullBitmaps[i].add(0);
+          }
+          setFinalResult(dataTableBuilder, columnDataTypes, i, result);
         }
-        setResult(dataTableBuilder, columnNames, columnDataTypes, i, result);
       }
       dataTableBuilder.finishRow();
       for (RoaringBitmap nullBitmap : nullBitmaps) {
@@ -89,7 +104,13 @@ public class AggregationResultsBlock extends BaseResultsBlock {
     } else {
       dataTableBuilder.startRow();
       for (int i = 0; i < numColumns; i++) {
-        setResult(dataTableBuilder, columnNames, columnDataTypes, i, _results.get(i));
+        Object result = _results.get(i);
+        if (!returnFinalResult) {
+          setIntermediateResult(dataTableBuilder, columnDataTypes, i, result);
+        } else {
+          result = _aggregationFunctions[i].extractFinalResult(result);
+          setFinalResult(dataTableBuilder, columnDataTypes, i, result);
+        }
       }
       dataTableBuilder.finishRow();
     }
@@ -99,23 +120,56 @@ public class AggregationResultsBlock extends BaseResultsBlock {
     return dataTable;
   }
 
-  private void setResult(DataTableBuilder dataTableBuilder, String[] columnNames, ColumnDataType[] columnDataTypes,
-      int index, Object result)
+  private void setIntermediateResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index,
+      Object result)
       throws IOException {
     ColumnDataType columnDataType = columnDataTypes[index];
     switch (columnDataType) {
       case LONG:
-        dataTableBuilder.setColumn(index, ((Number) result).longValue());
+        dataTableBuilder.setColumn(index, (long) result);
         break;
       case DOUBLE:
-        dataTableBuilder.setColumn(index, ((Double) result).doubleValue());
+        dataTableBuilder.setColumn(index, (double) result);
         break;
       case OBJECT:
         dataTableBuilder.setColumn(index, result);
         break;
       default:
-        throw new UnsupportedOperationException(
-            "Unsupported aggregation column data type: " + columnDataType + " for column: " + columnNames[index]);
+        throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType);
+    }
+  }
+
+  private void setFinalResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index,
+      Object result)
+      throws IOException {
+    ColumnDataType columnDataType = columnDataTypes[index];
+    switch (columnDataType) {
+      case INT:
+        dataTableBuilder.setColumn(index, (int) result);
+        break;
+      case LONG:
+        dataTableBuilder.setColumn(index, (long) result);
+        break;
+      case FLOAT:
+        dataTableBuilder.setColumn(index, (float) result);
+        break;
+      case DOUBLE:
+        dataTableBuilder.setColumn(index, (double) result);
+        break;
+      case BIG_DECIMAL:
+        dataTableBuilder.setColumn(index, (BigDecimal) result);
+        break;
+      case STRING:
+        dataTableBuilder.setColumn(index, result.toString());
+        break;
+      case BYTES:
+        dataTableBuilder.setColumn(index, (ByteArray) result);
+        break;
+      case DOUBLE_ARRAY:
+        dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements());
+        break;
+      default:
+        throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
     }
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 7b3fd84c1d..1d0a7c5089 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.core.operator.blocks.results;
 
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.util.Collection;
@@ -193,14 +194,11 @@ public class GroupByResultsBlock extends BaseResultsBlock {
         dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
         break;
       case STRING:
-        dataTableBuilder.setColumn(columnIndex, (String) value);
+        dataTableBuilder.setColumn(columnIndex, value.toString());
         break;
       case BYTES:
         dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
         break;
-      case OBJECT:
-        dataTableBuilder.setColumn(columnIndex, value);
-        break;
       case INT_ARRAY:
         dataTableBuilder.setColumn(columnIndex, (int[]) value);
         break;
@@ -211,11 +209,18 @@ public class GroupByResultsBlock extends BaseResultsBlock {
         dataTableBuilder.setColumn(columnIndex, (float[]) value);
         break;
       case DOUBLE_ARRAY:
-        dataTableBuilder.setColumn(columnIndex, (double[]) value);
+        if (value instanceof DoubleArrayList) {
+          dataTableBuilder.setColumn(columnIndex, ((DoubleArrayList) value).elements());
+        } else {
+          dataTableBuilder.setColumn(columnIndex, (double[]) value);
+        }
         break;
       case STRING_ARRAY:
         dataTableBuilder.setColumn(columnIndex, (String[]) value);
         break;
+      case OBJECT:
+        dataTableBuilder.setColumn(columnIndex, value);
+        break;
       default:
         throw new IllegalStateException("Unsupported stored type: " + storedColumnDataType);
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b4e828ed97..bb49dbfce2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -85,7 +85,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator<GroupByRe
     int minTrimSize = queryContext.getMinServerGroupTrimSize();
     if (minTrimSize > 0) {
       int limit = queryContext.getLimit();
-      if (queryContext.getOrderByExpressions() != null || queryContext.getHavingFilter() != null) {
+      if ((!queryContext.isServerReturnFinalResult() && queryContext.getOrderByExpressions() != null)
+          || queryContext.getHavingFilter() != null) {
         _trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
       } else {
         // TODO: Keeping only 'LIMIT' groups can cause inaccurate result because the groups are randomly selected
@@ -252,7 +253,11 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator<GroupByRe
     }
 
     IndexedTable indexedTable = _indexedTable;
-    indexedTable.finish(false);
+    if (!_queryContext.isServerReturnFinalResult()) {
+      indexedTable.finish(false);
+    } else {
+      indexedTable.finish(true, true);
+    }
     GroupByResultsBlock mergedBlock = new GroupByResultsBlock(indexedTable);
     mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
     mergedBlock.setNumResizes(indexedTable.getNumResizes());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 6c31690929..21593418e9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -27,6 +27,8 @@ import java.util.Map;
 import java.util.Set;
 import javax.annotation.Nullable;
 import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.common.BlockValSet;
 import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -115,4 +117,48 @@ public class AggregationFunctionUtils {
     BlockValSet blockValSet = transformBlock.getBlockValueSet(aggregationFunctionColumnPair.toColumnName());
     return Collections.singletonMap(expression, blockValSet);
   }
+
+  /**
+   * Reads the intermediate result from the {@link DataTable}.
+   */
+  public static Object getIntermediateResult(DataTable dataTable, ColumnDataType columnDataType, int rowId, int colId) {
+    switch (columnDataType) {
+      case LONG:
+        return dataTable.getLong(rowId, colId);
+      case DOUBLE:
+        return dataTable.getDouble(rowId, colId);
+      case OBJECT:
+        return dataTable.getObject(rowId, colId);
+      default:
+        throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType);
+    }
+  }
+
+  /**
+   * Reads the converted final result from the {@link DataTable}. It should be equivalent to running
+   * {@link AggregationFunction#extractFinalResult(Object)} and {@link ColumnDataType#convert(Object)}.
+   */
+  public static Object getConvertedFinalResult(DataTable dataTable, ColumnDataType columnDataType, int rowId,
+      int colId) {
+    switch (columnDataType) {
+      case INT:
+        return dataTable.getInt(rowId, colId);
+      case LONG:
+        return dataTable.getLong(rowId, colId);
+      case FLOAT:
+        return dataTable.getFloat(rowId, colId);
+      case DOUBLE:
+        return dataTable.getDouble(rowId, colId);
+      case BIG_DECIMAL:
+        return dataTable.getBigDecimal(rowId, colId);
+      case STRING:
+        return dataTable.getString(rowId, colId);
+      case BYTES:
+        return dataTable.getBytes(rowId, colId).getBytes();
+      case DOUBLE_ARRAY:
+        return dataTable.getDoubleArray(rowId, colId);
+      default:
+        throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
+    }
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index dd29a24c76..238fb557b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import com.google.common.base.Preconditions;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -27,6 +29,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.common.utils.DataTable;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.roaringbitmap.RoaringBitmap;
@@ -52,6 +55,8 @@ public class AggregationDataTableReducer implements DataTableReducer {
   public void reduceAndSetResults(String tableName, DataSchema dataSchema,
       Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
       DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    assert dataSchema != null;
+
     if (dataTableMap.isEmpty()) {
       DataSchema resultTableSchema =
           new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema()).getResultDataSchema();
@@ -59,43 +64,31 @@ public class AggregationDataTableReducer implements DataTableReducer {
       return;
     }
 
-    // Merge results from all data tables
+    if (!_queryContext.isServerReturnFinalResult()) {
+      reduceWithIntermediateResult(dataSchema, dataTableMap.values(), brokerResponseNative);
+    } else {
+      Preconditions.checkState(dataTableMap.size() == 1, "Cannot merge final results from multiple servers");
+      reduceWithFinalResult(dataSchema, dataTableMap.values().iterator().next(), brokerResponseNative);
+    }
+  }
+
+  private void reduceWithIntermediateResult(DataSchema dataSchema, Collection<DataTable> dataTables,
+      BrokerResponseNative brokerResponseNative) {
     int numAggregationFunctions = _aggregationFunctions.length;
     Object[] intermediateResults = new Object[numAggregationFunctions];
-    for (DataTable dataTable : dataTableMap.values()) {
+    for (DataTable dataTable : dataTables) {
       for (int i = 0; i < numAggregationFunctions; i++) {
         Object intermediateResultToMerge;
         ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
         if (_queryContext.isNullHandlingEnabled()) {
           RoaringBitmap nullBitmap = dataTable.getNullRowIds(i);
-          boolean isNull = nullBitmap != null && nullBitmap.contains(0);
-          switch (columnDataType) {
-            case LONG:
-              intermediateResultToMerge = isNull ? null : dataTable.getLong(0, i);
-              break;
-            case DOUBLE:
-              intermediateResultToMerge = isNull ? null : dataTable.getDouble(0, i);
-              break;
-            case OBJECT:
-              intermediateResultToMerge = isNull ? null : dataTable.getObject(0, i);
-              break;
-            default:
-              throw new IllegalStateException("Illegal column data type in aggregation results: " + columnDataType);
+          if (nullBitmap != null && nullBitmap.contains(0)) {
+            intermediateResultToMerge = null;
+          } else {
+            intermediateResultToMerge = AggregationFunctionUtils.getIntermediateResult(dataTable, columnDataType, 0, i);
           }
         } else {
-          switch (columnDataType) {
-            case LONG:
-              intermediateResultToMerge = dataTable.getLong(0, i);
-              break;
-            case DOUBLE:
-              intermediateResultToMerge = dataTable.getDouble(0, i);
-              break;
-            case OBJECT:
-              intermediateResultToMerge = dataTable.getObject(0, i);
-              break;
-            default:
-              throw new IllegalStateException("Illegal column data type in aggregation results: " + columnDataType);
-          }
+          intermediateResultToMerge = AggregationFunctionUtils.getIntermediateResult(dataTable, columnDataType, 0, i);
         }
         Object mergedIntermediateResult = intermediateResults[i];
         if (mergedIntermediateResult == null) {
@@ -114,6 +107,26 @@ public class AggregationDataTableReducer implements DataTableReducer {
     brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
   }
 
+  private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
+      BrokerResponseNative brokerResponseNative) {
+    int numAggregationFunctions = _aggregationFunctions.length;
+    Object[] finalResults = new Object[numAggregationFunctions];
+    for (int i = 0; i < numAggregationFunctions; i++) {
+      ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+      if (_queryContext.isNullHandlingEnabled()) {
+        RoaringBitmap nullBitmap = dataTable.getNullRowIds(i);
+        if (nullBitmap != null && nullBitmap.contains(0)) {
+          finalResults[i] = null;
+        } else {
+          finalResults[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, i);
+        }
+      } else {
+        finalResults[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, i);
+      }
+    }
+    brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
+  }
+
   /**
    * Sets aggregation results into ResultsTable
    */
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index f167ca8a50..f04d5b9724 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -18,6 +18,8 @@
  */
 package org.apache.pinot.core.query.reduce;
 
+import com.google.common.base.Preconditions;
+import java.sql.Timestamp;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -47,6 +49,7 @@ import org.apache.pinot.core.data.table.SimpleIndexedTable;
 import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
 import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
 import org.apache.pinot.core.query.request.context.QueryContext;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.core.util.GroupByUtils;
@@ -84,19 +87,35 @@ public class GroupByDataTableReducer implements DataTableReducer {
    */
   @Override
   public void reduceAndSetResults(String tableName, DataSchema dataSchema,
-      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponse,
       DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
     assert dataSchema != null;
-    try {
-      reduceToResultTable(brokerResponseNative, dataSchema, dataTableMap.values(), reducerContext, tableName,
-          brokerMetrics);
-      if (brokerMetrics != null) {
-        brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE,
-            brokerResponseNative.getResultTable().getRows().size());
+
+    if (dataTableMap.isEmpty()) {
+      PostAggregationHandler postAggregationHandler =
+          new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema(dataSchema));
+      DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+      brokerResponse.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
+      return;
+    }
+
+    if (!_queryContext.isServerReturnFinalResult()) {
+      try {
+        reduceWithIntermediateResult(brokerResponse, dataSchema, dataTableMap.values(), reducerContext, tableName,
+            brokerMetrics);
+      } catch (TimeoutException e) {
+        brokerResponse.getProcessingExceptions()
+            .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
       }
-    } catch (TimeoutException e) {
-      brokerResponseNative.getProcessingExceptions()
-          .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
+    } else {
+      // TODO: Support merging results from multiple servers when the data is partitioned on the group-by column
+      Preconditions.checkState(dataTableMap.size() == 1, "Cannot merge final results from multiple servers");
+      reduceWithFinalResult(dataSchema, dataTableMap.values().iterator().next(), brokerResponse);
+    }
+
+    if (brokerMetrics != null && brokerResponse.getResultTable() != null) {
+      brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE,
+          brokerResponse.getResultTable().getRows().size());
     }
   }
 
@@ -110,24 +129,17 @@ public class GroupByDataTableReducer implements DataTableReducer {
    * @param brokerMetrics broker metrics (meters)
    * @throws TimeoutException If unable complete within timeout.
    */
-  private void reduceToResultTable(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+  private void reduceWithIntermediateResult(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
       Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
       BrokerMetrics brokerMetrics)
       throws TimeoutException {
-    int numRecords;
-    Iterator<Record> sortedIterator;
-    if (!dataTables.isEmpty()) {
-      IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
-      if (brokerMetrics != null) {
-        brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
-        brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
-      }
-      numRecords = indexedTable.size();
-      sortedIterator = indexedTable.iterator();
-    } else {
-      numRecords = 0;
-      sortedIterator = Collections.emptyIterator();
+    IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
+    if (brokerMetrics != null) {
+      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+      brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
     }
+    int numRecords = indexedTable.size();
+    Iterator<Record> sortedIterator = indexedTable.iterator();
 
     DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
     PostAggregationHandler postAggregationHandler =
@@ -148,8 +160,8 @@ public class GroupByDataTableReducer implements DataTableReducer {
     FilterContext havingFilter = _queryContext.getHavingFilter();
     if (havingFilter != null) {
       rows = new ArrayList<>();
-      HavingFilterHandler havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler,
-          _queryContext.isNullHandlingEnabled());
+      HavingFilterHandler havingFilterHandler =
+          new HavingFilterHandler(havingFilter, postAggregationHandler, _queryContext.isNullHandlingEnabled());
       while (rows.size() < limit && sortedIterator.hasNext()) {
         Object[] row = sortedIterator.next().getValues();
         extractFinalAggregationResults(row);
@@ -180,19 +192,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
     }
 
     // Calculate final result rows after post aggregation
-    List<Object[]> resultRows = new ArrayList<>(rows.size());
-    ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
-    int numResultColumns = resultColumnDataTypes.length;
-    for (Object[] row : rows) {
-      Object[] resultRow = postAggregationHandler.getResult(row);
-      for (int i = 0; i < numResultColumns; i++) {
-        Object value = resultRow[i];
-        if (value != null) {
-          resultRow[i] = resultColumnDataTypes[i].format(value);
-        }
-      }
-      resultRows.add(resultRow);
-    }
+    List<Object[]> resultRows = calculateFinalResultRows(postAggregationHandler, rows);
 
     brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
   }
@@ -376,4 +376,103 @@ public class GroupByDataTableReducer implements DataTableReducer {
       return Math.min(numDataTables, maxReduceThreadsPerQuery);
     }
   }
+
+  private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
+      BrokerResponseNative brokerResponseNative) {
+    PostAggregationHandler postAggregationHandler = new PostAggregationHandler(_queryContext, dataSchema);
+    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+
+    // Directly return when there is no record returned, or limit is 0
+    int numRows = dataTable.getNumberOfRows();
+    int limit = _queryContext.getLimit();
+    if (numRows == 0 || limit == 0) {
+      brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
+      return;
+    }
+
+    // Calculate rows before post-aggregation
+    List<Object[]> rows;
+    FilterContext havingFilter = _queryContext.getHavingFilter();
+    if (havingFilter != null) {
+      rows = new ArrayList<>();
+      HavingFilterHandler havingFilterHandler =
+          new HavingFilterHandler(havingFilter, postAggregationHandler, _queryContext.isNullHandlingEnabled());
+      for (int i = 0; i < numRows; i++) {
+        Object[] row = getConvertedRowWithFinalResult(dataTable, i);
+        if (havingFilterHandler.isMatch(row)) {
+          rows.add(row);
+          if (rows.size() == limit) {
+            break;
+          }
+        }
+      }
+    } else {
+      numRows = Math.min(numRows, limit);
+      rows = new ArrayList<>(numRows);
+      for (int i = 0; i < numRows; i++) {
+        rows.add(getConvertedRowWithFinalResult(dataTable, i));
+      }
+    }
+
+    // Calculate final result rows after post aggregation
+    List<Object[]> resultRows = calculateFinalResultRows(postAggregationHandler, rows);
+
+    brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
+  }
+
+  private List<Object[]> calculateFinalResultRows(PostAggregationHandler postAggregationHandler, List<Object[]> rows) {
+    List<Object[]> resultRows = new ArrayList<>(rows.size());
+    ColumnDataType[] resultColumnDataTypes = postAggregationHandler.getResultDataSchema().getColumnDataTypes();
+    int numResultColumns = resultColumnDataTypes.length;
+    for (Object[] row : rows) {
+      Object[] resultRow = postAggregationHandler.getResult(row);
+      for (int i = 0; i < numResultColumns; i++) {
+        Object value = resultRow[i];
+        if (value != null) {
+          resultRow[i] = resultColumnDataTypes[i].format(value);
+        }
+      }
+      resultRows.add(resultRow);
+    }
+    return resultRows;
+  }
+
+  private Object[] getConvertedRowWithFinalResult(DataTable dataTable, int rowId) {
+    Object[] row = new Object[_numColumns];
+    ColumnDataType[] columnDataTypes = dataTable.getDataSchema().getColumnDataTypes();
+    for (int i = 0; i < _numColumns; i++) {
+      if (i < _numGroupByExpressions) {
+        row[i] = getConvertedKey(dataTable, columnDataTypes[i], rowId, i);
+      } else {
+        row[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataTypes[i], rowId, i);
+      }
+    }
+    return row;
+  }
+
+  private Object getConvertedKey(DataTable dataTable, ColumnDataType columnDataType, int rowId, int colId) {
+    switch (columnDataType) {
+      case INT:
+        return dataTable.getInt(rowId, colId);
+      case LONG:
+        return dataTable.getLong(rowId, colId);
+      case FLOAT:
+        return dataTable.getFloat(rowId, colId);
+      case DOUBLE:
+        return dataTable.getDouble(rowId, colId);
+      case BIG_DECIMAL:
+        return dataTable.getBigDecimal(rowId, colId);
+      case BOOLEAN:
+        return dataTable.getInt(rowId, colId) == 1;
+      case TIMESTAMP:
+        return new Timestamp(dataTable.getLong(rowId, colId));
+      case STRING:
+      case JSON:
+        return dataTable.getString(rowId, colId);
+      case BYTES:
+        return dataTable.getBytes(rowId, colId).getBytes();
+      default:
+        throw new IllegalStateException("Illegal column data type in group key: " + columnDataType);
+    }
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 2faeefc15e..dbf8d64fbb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -120,6 +120,8 @@ public class QueryContext {
   private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
   // Whether null handling is enabled
   private boolean _nullHandlingEnabled;
+  // Whether server returns the final result
+  private boolean _serverReturnFinalResult;
 
   private QueryContext(@Nullable String tableName, @Nullable QueryContext subquery,
       List<ExpressionContext> selectExpressions, List<String> aliasList, @Nullable FilterContext filter,
@@ -383,6 +385,14 @@ public class QueryContext {
     _nullHandlingEnabled = nullHandlingEnabled;
   }
 
+  public boolean isServerReturnFinalResult() {
+    return _serverReturnFinalResult;
+  }
+
+  public void setServerReturnFinalResult(boolean serverReturnFinalResult) {
+    _serverReturnFinalResult = serverReturnFinalResult;
+  }
+
   /**
    * Gets or computes a value of type {@code V} associated with a key of type {@code K} so that it can be shared
    * within the scope of a query.
@@ -500,6 +510,7 @@ public class QueryContext {
           new QueryContext(_tableName, _subquery, _selectExpressions, _aliasList, _filter, _groupByExpressions,
               _havingFilter, _orderByExpressions, _limit, _offset, _queryOptions, _expressionOverrideHints, _explain);
       queryContext.setNullHandlingEnabled(QueryOptionsUtils.isNullHandlingEnabled(_queryOptions));
+      queryContext.setServerReturnFinalResult(QueryOptionsUtils.isServerReturnFinalResult(_queryOptions));
 
       // Pre-calculate the aggregation functions and columns for the query
       generateAggregationFunctions(queryContext);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
index 0ac77c10e9..dc40e20a88 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
@@ -22,7 +22,8 @@ import com.google.common.base.Preconditions;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.pinot.core.common.datatable.DataTableFactory;
-import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionValue;
 
 
 /**
@@ -34,7 +35,7 @@ public class QueryOptionsUtils {
 
   @Nullable
   public static Long getTimeoutMs(Map<String, String> queryOptions) {
-    String timeoutMsString = queryOptions.get(Request.QueryOptionKey.TIMEOUT_MS);
+    String timeoutMsString = queryOptions.get(QueryOptionKey.TIMEOUT_MS);
     if (timeoutMsString != null) {
       long timeoutMs = Long.parseLong(timeoutMsString);
       Preconditions.checkState(timeoutMs > 0, "Query timeout must be positive, got: %s", timeoutMs);
@@ -45,56 +46,60 @@ public class QueryOptionsUtils {
   }
 
   public static boolean isSkipUpsert(Map<String, String> queryOptions) {
-    return Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.SKIP_UPSERT));
+    return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT));
   }
 
   public static boolean isSkipStarTree(Map<String, String> queryOptions) {
-    return "false".equalsIgnoreCase(queryOptions.get(Request.QueryOptionKey.USE_STAR_TREE));
+    return "false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_STAR_TREE));
   }
 
   public static boolean isRoutingForceHLC(Map<String, String> queryOptions) {
-    String routingOptions = queryOptions.get(Request.QueryOptionKey.ROUTING_OPTIONS);
-    return routingOptions != null && routingOptions.toUpperCase().contains(Request.QueryOptionValue.ROUTING_FORCE_HLC);
+    String routingOptions = queryOptions.get(QueryOptionKey.ROUTING_OPTIONS);
+    return routingOptions != null && routingOptions.toUpperCase().contains(QueryOptionValue.ROUTING_FORCE_HLC);
   }
 
   public static boolean isSkipScanFilterReorder(Map<String, String> queryOptions) {
-    return "false".equalsIgnoreCase(queryOptions.get(Request.QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
+    return "false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
   }
 
   @Nullable
   public static Integer getNumReplicaGroupsToQuery(Map<String, String> queryOptions) {
-    String numReplicaGroupsToQuery = queryOptions.get(Request.QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
+    String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
     return numReplicaGroupsToQuery != null ? Integer.parseInt(numReplicaGroupsToQuery) : null;
   }
 
   public static boolean isExplainPlanVerbose(Map<String, String> queryOptions) {
-    return Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.EXPLAIN_PLAN_VERBOSE));
+    return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.EXPLAIN_PLAN_VERBOSE));
   }
 
   @Nullable
   public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) {
-    String maxExecutionThreadsString = queryOptions.get(Request.QueryOptionKey.MAX_EXECUTION_THREADS);
+    String maxExecutionThreadsString = queryOptions.get(QueryOptionKey.MAX_EXECUTION_THREADS);
     return maxExecutionThreadsString != null ? Integer.parseInt(maxExecutionThreadsString) : null;
   }
 
   @Nullable
   public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) {
-    String minSegmentGroupTrimSizeString = queryOptions.get(Request.QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
+    String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
     return minSegmentGroupTrimSizeString != null ? Integer.parseInt(minSegmentGroupTrimSizeString) : null;
   }
 
   @Nullable
   public static Integer getMinServerGroupTrimSize(Map<String, String> queryOptions) {
-    String minServerGroupTrimSizeString = queryOptions.get(Request.QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE);
+    String minServerGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE);
     return minServerGroupTrimSizeString != null ? Integer.parseInt(minServerGroupTrimSizeString) : null;
   }
 
   public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
-    boolean nullHandlingEnabled = Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.ENABLE_NULL_HANDLING));
+    boolean nullHandlingEnabled = Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
     if (nullHandlingEnabled) {
       Preconditions.checkState(DataTableFactory.getDataTableVersion() >= DataTableFactory.VERSION_4,
           "Null handling cannot be enabled for data table version smaller than 4");
     }
     return nullHandlingEnabled;
   }
+
+  public static boolean isServerReturnFinalResult(Map<String, String> queryOptions) {
+    return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT));
+  }
 }
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 79e1c7c59f..962fd84a2b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -88,7 +88,6 @@ import static org.apache.pinot.common.function.scalar.StringFunctions.*;
 import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS;
 import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
 import static org.testng.Assert.*;
-import static org.testng.Assert.assertEquals;
 
 
 /**
@@ -260,6 +259,15 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     }
   }
 
+  @Override
+  protected void testQuery(String pinotQuery, String h2Query)
+      throws Exception {
+    if (getNumServers() == 1) {
+      pinotQuery = "SET serverReturnFinalResult = true;" + pinotQuery;
+    }
+    super.testQuery(pinotQuery, h2Query);
+  }
+
   @Test
   public void testInstancesStarted() {
     assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + getNumServers());
@@ -554,8 +562,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     TableConfig tableConfig = getOfflineTableConfig();
     tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
     updateTableConfig(tableConfig);
-    String reloadJobId = reloadOfflineTableAndValidateResponse(
-        TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), false);
+    String reloadJobId =
+        reloadOfflineTableAndValidateResponse(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), false);
 
     // It takes a while to reload multiple segments, thus we retry the query for some time.
     // After all segments are reloaded, the inverted index is added on DivActualElapsedTime.
@@ -592,7 +600,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   }
 
   @Test
-  public void testRegexpReplace() throws Exception {
+  public void testRegexpReplace()
+      throws Exception {
     // Correctness tests of regexpReplace.
 
     // Test replace all.
@@ -643,7 +652,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     result = response.get("resultTable").get("rows").get(0).get(0).asText();
     assertEquals(result, "hsomething, something, something and wise");
 
-
     // Test occurence
     sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+thy', 'something', 0, 2)";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
@@ -685,8 +693,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     JsonNode rows = response.get("resultTable").get("rows");
     for (int i = 0; i < rows.size(); i++) {
       JsonNode row = rows.get(i);
-      boolean containsSpace = row.get(0).asText().contains(" ");
-      assertEquals(containsSpace, false);
+      assertFalse(row.get(0).asText().contains(" "));
     }
 
     // Test in where clause
@@ -699,8 +706,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(count1, count2);
 
     // Test nested transform
-    sqlQuery = "SELECT count(*) from myTable where contains(regexpReplace(originState, '(C)(A)', '$1TEST$2'), "
-        + "'CTESTA')";
+    sqlQuery =
+        "SELECT count(*) from myTable where contains(regexpReplace(originState, '(C)(A)', '$1TEST$2'), 'CTESTA')";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
     count1 = response.get("resultTable").get("rows").get(0).get(0).asInt();
     sqlQuery = "SELECT count(*) from myTable where originState='CA'";
@@ -810,8 +817,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     // long string literal encode
     sqlQuery =
         "SELECT toBase64(toUtf8('this is a long string that will encode to more than 76 characters using base64')) "
-            + "FROM "
-            + "myTable";
+            + "FROM myTable";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
     resultTable = response.get("resultTable");
     rows = resultTable.get("rows");
@@ -842,9 +848,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(encodedString, toBase64(toUtf8("123")));
 
     // identifier
-    sqlQuery =
-        "SELECT Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))), fromBase64"
-            + "(toBase64(toUtf8(Carrier))) FROM myTable LIMIT 100";
+    sqlQuery = "SELECT Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))), "
+        + "fromBase64(toBase64(toUtf8(Carrier))) FROM myTable LIMIT 100";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
@@ -881,9 +886,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertTrue(response.get("exceptions").get(0).get("message").toString().contains("IllegalArgumentException"));
 
     // string literal used in a filter
-    sqlQuery =
-        "SELECT * FROM myTable WHERE fromUtf8(fromBase64('aGVsbG8h')) != Carrier AND toBase64(toUtf8('hello!')) != "
-        + "Carrier LIMIT 10";
+    sqlQuery = "SELECT * FROM myTable WHERE fromUtf8(fromBase64('aGVsbG8h')) != Carrier AND "
+        + "toBase64(toUtf8('hello!')) != Carrier LIMIT 10";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
     resultTable = response.get("resultTable");
     rows = resultTable.get("rows");
@@ -904,10 +908,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(rows.size(), 10);
 
     // non-string identifier used in a filter
-    sqlQuery =
-        "SELECT fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))), AirlineID FROM myTable WHERE fromUtf8(fromBase64"
-        + "(toBase64(toUtf8(AirlineID)))) = "
-            + "AirlineID LIMIT 10";
+    sqlQuery = "SELECT fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))), AirlineID FROM myTable WHERE "
+        + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) = AirlineID LIMIT 10";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
@@ -916,12 +918,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     assertEquals(rows.size(), 10);
 
     // string identifier used in group by order by
-    sqlQuery =
-        "SELECT Carrier as originalCol, toBase64(toUtf8(Carrier)) as encoded, fromUtf8(fromBase64(toBase64(toUtf8"
-        + "(Carrier)))) as decoded "
-            + "FROM myTable GROUP BY Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)"
-            + "))) ORDER BY toBase64(toUtf8(Carrier))"
-            + " LIMIT 10";
+    sqlQuery = "SELECT Carrier as originalCol, toBase64(toUtf8(Carrier)) as encoded, "
+        + "fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) as decoded FROM myTable "
+        + "GROUP BY Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) "
+        + "ORDER BY toBase64(toUtf8(Carrier)) LIMIT 10";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
@@ -938,12 +938,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     }
 
     // non-string identifier used in group by order by
-    sqlQuery =
-        "SELECT AirlineID as originalCol, toBase64(toUtf8(AirlineID)) as encoded, fromUtf8(fromBase64(toBase64(toUtf8"
-        + "(AirlineID)))) as decoded "
-            + "FROM myTable GROUP BY AirlineID, toBase64(toUtf8(AirlineID)), fromUtf8(fromBase64(toBase64(toUtf8"
-            + "(AirlineID)))) ORDER BY "
-            + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) LIMIT 10";
+    sqlQuery = "SELECT AirlineID as originalCol, toBase64(toUtf8(AirlineID)) as encoded, "
+        + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) as decoded FROM myTable "
+        + "GROUP BY AirlineID, toBase64(toUtf8(AirlineID)), fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) "
+        + "ORDER BY fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) LIMIT 10";
     response = postQuery(sqlQuery, _brokerBaseApiUrl);
     resultTable = response.get("resultTable");
     dataSchema = resultTable.get("dataSchema");
@@ -1024,10 +1022,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
         "key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253");
     assertEquals(response.get("resultTable").get("rows").get(0).get(8).asText(),
         "key1=value 1&key2=value@!$2&key3=value%3");
-    assertEquals(response.get("resultTable").get("rows").get(0).get(9).asText(),
-        "aGVsbG8h");
-    assertEquals(response.get("resultTable").get("rows").get(0).get(10).asText(),
-        "hello!");
+    assertEquals(response.get("resultTable").get("rows").get(0).get(9).asText(), "aGVsbG8h");
+    assertEquals(response.get("resultTable").get("rows").get(0).get(10).asText(), "hello!");
   }
 
   @Test(dependsOnMethods = "testBloomFilterTriggering")
@@ -2769,8 +2765,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
       throws IOException {
     String jobId = null;
     String response =
-        sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
-            null);
+        sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload), null);
     String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
     JsonNode tableLevelDetails =
         JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 738800c6cb..2ca5801b1a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -283,6 +283,7 @@ public class CommonConstants {
         public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
         public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine";
         public static final String ENABLE_NULL_HANDLING = "enableNullHandling";
+        public static final String SERVER_RETURN_FINAL_RESULT = "serverReturnFinalResult";
 
         // TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
         @Deprecated


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org