You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by si...@apache.org on 2022/09/07 08:56:31 UTC
[pinot] branch master updated: Allow server to directly return the final aggregation result (#9304)
This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5ca2ed4cbe Allow server to directly return the final aggregation result (#9304)
5ca2ed4cbe is described below
commit 5ca2ed4cbe16f33f5404824f4bcfdcc3e0b54f2e
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Sep 7 01:56:22 2022 -0700
Allow server to directly return the final aggregation result (#9304)
---
.../apache/pinot/core/data/table/IndexedTable.java | 20 ++-
.../org/apache/pinot/core/data/table/Table.java | 7 +-
.../blocks/results/AggregationResultsBlock.java | 80 ++++++++--
.../blocks/results/GroupByResultsBlock.java | 15 +-
.../combine/GroupByOrderByCombineOperator.java | 9 +-
.../function/AggregationFunctionUtils.java | 46 ++++++
.../query/reduce/AggregationDataTableReducer.java | 69 ++++----
.../core/query/reduce/GroupByDataTableReducer.java | 177 ++++++++++++++++-----
.../core/query/request/context/QueryContext.java | 11 ++
.../apache/pinot/core/util/QueryOptionsUtils.java | 31 ++--
.../tests/OfflineClusterIntegrationTest.java | 73 ++++-----
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
12 files changed, 397 insertions(+), 142 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
index dbd4e99891..012fdc1170 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
@@ -27,6 +27,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.request.context.ExpressionContext;
import org.apache.pinot.common.request.context.OrderByExpressionContext;
import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
@@ -54,7 +55,7 @@ public abstract class IndexedTable extends BaseTable {
*
* @param dataSchema Data schema of the table
* @param queryContext Query context
- * @param resultSize Number of records to keep in the final result after calling {@link #finish(boolean)}
+ * @param resultSize Number of records to keep in the final result after calling {@link #finish(boolean, boolean)}
* @param trimSize Number of records to keep when trimming the table
* @param trimThreshold Trim the table when the number of records exceeds the threshold
* @param lookupMap Map from keys to records
@@ -144,7 +145,7 @@ public abstract class IndexedTable extends BaseTable {
}
@Override
- public void finish(boolean sort) {
+ public void finish(boolean sort, boolean storeFinalResult) {
if (_hasOrderBy) {
long startTimeNs = System.nanoTime();
_topRecords = _tableResizer.getTopRecords(_lookupMap, _resultSize, sort);
@@ -154,6 +155,21 @@ public abstract class IndexedTable extends BaseTable {
} else {
_topRecords = _lookupMap.values();
}
+ // TODO: Directly return final result in _tableResizer.getTopRecords to avoid extracting final result multiple times
+ if (storeFinalResult) {
+ ColumnDataType[] columnDataTypes = _dataSchema.getColumnDataTypes();
+ int numAggregationFunctions = _aggregationFunctions.length;
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ columnDataTypes[i + _numKeyColumns] = _aggregationFunctions[i].getFinalResultColumnType();
+ }
+ for (Record record : _topRecords) {
+ Object[] values = record.getValues();
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ int colId = i + _numKeyColumns;
+ values[colId] = _aggregationFunctions[i].extractFinalResult(values[colId]);
+ }
+ }
+ }
}
@Override
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
index d491b119f3..ab682e5dc0 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/table/Table.java
@@ -55,11 +55,16 @@ public interface Table {
*/
Iterator<Record> iterator();
+ default void finish(boolean sort) {
+ finish(sort, false);
+ }
+
/**
* Finish any pre exit processing
* @param sort sort the final results if true
+ * @param storeFinalResult whether to store final aggregation result
*/
- void finish(boolean sort);
+ void finish(boolean sort, boolean storeFinalResult);
/**
* Returns the data schema of the table
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
index e4d0ba6bca..bba0c6df61 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/AggregationResultsBlock.java
@@ -18,7 +18,9 @@
*/
package org.apache.pinot.core.operator.blocks.results;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
+import java.math.BigDecimal;
import java.util.List;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
@@ -27,6 +29,7 @@ import org.apache.pinot.core.common.datatable.DataTableBuilder;
import org.apache.pinot.core.common.datatable.DataTableFactory;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.spi.utils.ByteArray;
import org.apache.pinot.spi.utils.NullValueUtils;
import org.roaringbitmap.RoaringBitmap;
@@ -34,7 +37,7 @@ import org.roaringbitmap.RoaringBitmap;
/**
* Results block for aggregation queries.
*/
-@SuppressWarnings("rawtypes")
+@SuppressWarnings({"rawtypes", "unchecked"})
public class AggregationResultsBlock extends BaseResultsBlock {
private final AggregationFunction[] _aggregationFunctions;
private final List<Object> _results;
@@ -55,6 +58,8 @@ public class AggregationResultsBlock extends BaseResultsBlock {
@Override
public DataTable getDataTable(QueryContext queryContext)
throws Exception {
+ boolean returnFinalResult = queryContext.isServerReturnFinalResult();
+
// Extract result column name and type from each aggregation function
int numColumns = _aggregationFunctions.length;
String[] columnNames = new String[numColumns];
@@ -62,7 +67,8 @@ public class AggregationResultsBlock extends BaseResultsBlock {
for (int i = 0; i < numColumns; i++) {
AggregationFunction aggregationFunction = _aggregationFunctions[i];
columnNames[i] = aggregationFunction.getColumnName();
- columnDataTypes[i] = aggregationFunction.getIntermediateResultColumnType();
+ columnDataTypes[i] = returnFinalResult ? aggregationFunction.getFinalResultColumnType()
+ : aggregationFunction.getIntermediateResultColumnType();
}
// Build the data table.
@@ -76,11 +82,20 @@ public class AggregationResultsBlock extends BaseResultsBlock {
dataTableBuilder.startRow();
for (int i = 0; i < numColumns; i++) {
Object result = _results.get(i);
- if (result == null && columnDataTypes[i] != ColumnDataType.OBJECT) {
- result = NullValueUtils.getDefaultNullValue(columnDataTypes[i].toDataType());
- nullBitmaps[i].add(0);
+ if (!returnFinalResult) {
+ if (result == null && columnDataTypes[i] != ColumnDataType.OBJECT) {
+ result = NullValueUtils.getDefaultNullValue(columnDataTypes[i].toDataType());
+ nullBitmaps[i].add(0);
+ }
+ setIntermediateResult(dataTableBuilder, columnDataTypes, i, result);
+ } else {
+ result = _aggregationFunctions[i].extractFinalResult(result);
+ if (result == null) {
+ result = NullValueUtils.getDefaultNullValue(columnDataTypes[i].toDataType());
+ nullBitmaps[i].add(0);
+ }
+ setFinalResult(dataTableBuilder, columnDataTypes, i, result);
}
- setResult(dataTableBuilder, columnNames, columnDataTypes, i, result);
}
dataTableBuilder.finishRow();
for (RoaringBitmap nullBitmap : nullBitmaps) {
@@ -89,7 +104,13 @@ public class AggregationResultsBlock extends BaseResultsBlock {
} else {
dataTableBuilder.startRow();
for (int i = 0; i < numColumns; i++) {
- setResult(dataTableBuilder, columnNames, columnDataTypes, i, _results.get(i));
+ Object result = _results.get(i);
+ if (!returnFinalResult) {
+ setIntermediateResult(dataTableBuilder, columnDataTypes, i, result);
+ } else {
+ result = _aggregationFunctions[i].extractFinalResult(result);
+ setFinalResult(dataTableBuilder, columnDataTypes, i, result);
+ }
}
dataTableBuilder.finishRow();
}
@@ -99,23 +120,56 @@ public class AggregationResultsBlock extends BaseResultsBlock {
return dataTable;
}
- private void setResult(DataTableBuilder dataTableBuilder, String[] columnNames, ColumnDataType[] columnDataTypes,
- int index, Object result)
+ private void setIntermediateResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index,
+ Object result)
throws IOException {
ColumnDataType columnDataType = columnDataTypes[index];
switch (columnDataType) {
case LONG:
- dataTableBuilder.setColumn(index, ((Number) result).longValue());
+ dataTableBuilder.setColumn(index, (long) result);
break;
case DOUBLE:
- dataTableBuilder.setColumn(index, ((Double) result).doubleValue());
+ dataTableBuilder.setColumn(index, (double) result);
break;
case OBJECT:
dataTableBuilder.setColumn(index, result);
break;
default:
- throw new UnsupportedOperationException(
- "Unsupported aggregation column data type: " + columnDataType + " for column: " + columnNames[index]);
+ throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType);
+ }
+ }
+
+ private void setFinalResult(DataTableBuilder dataTableBuilder, ColumnDataType[] columnDataTypes, int index,
+ Object result)
+ throws IOException {
+ ColumnDataType columnDataType = columnDataTypes[index];
+ switch (columnDataType) {
+ case INT:
+ dataTableBuilder.setColumn(index, (int) result);
+ break;
+ case LONG:
+ dataTableBuilder.setColumn(index, (long) result);
+ break;
+ case FLOAT:
+ dataTableBuilder.setColumn(index, (float) result);
+ break;
+ case DOUBLE:
+ dataTableBuilder.setColumn(index, (double) result);
+ break;
+ case BIG_DECIMAL:
+ dataTableBuilder.setColumn(index, (BigDecimal) result);
+ break;
+ case STRING:
+ dataTableBuilder.setColumn(index, result.toString());
+ break;
+ case BYTES:
+ dataTableBuilder.setColumn(index, (ByteArray) result);
+ break;
+ case DOUBLE_ARRAY:
+ dataTableBuilder.setColumn(index, ((DoubleArrayList) result).elements());
+ break;
+ default:
+ throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
}
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
index 7b3fd84c1d..1d0a7c5089 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/blocks/results/GroupByResultsBlock.java
@@ -19,6 +19,7 @@
package org.apache.pinot.core.operator.blocks.results;
import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collection;
@@ -193,14 +194,11 @@ public class GroupByResultsBlock extends BaseResultsBlock {
dataTableBuilder.setColumn(columnIndex, (BigDecimal) value);
break;
case STRING:
- dataTableBuilder.setColumn(columnIndex, (String) value);
+ dataTableBuilder.setColumn(columnIndex, value.toString());
break;
case BYTES:
dataTableBuilder.setColumn(columnIndex, (ByteArray) value);
break;
- case OBJECT:
- dataTableBuilder.setColumn(columnIndex, value);
- break;
case INT_ARRAY:
dataTableBuilder.setColumn(columnIndex, (int[]) value);
break;
@@ -211,11 +209,18 @@ public class GroupByResultsBlock extends BaseResultsBlock {
dataTableBuilder.setColumn(columnIndex, (float[]) value);
break;
case DOUBLE_ARRAY:
- dataTableBuilder.setColumn(columnIndex, (double[]) value);
+ if (value instanceof DoubleArrayList) {
+ dataTableBuilder.setColumn(columnIndex, ((DoubleArrayList) value).elements());
+ } else {
+ dataTableBuilder.setColumn(columnIndex, (double[]) value);
+ }
break;
case STRING_ARRAY:
dataTableBuilder.setColumn(columnIndex, (String[]) value);
break;
+ case OBJECT:
+ dataTableBuilder.setColumn(columnIndex, value);
+ break;
default:
throw new IllegalStateException("Unsupported stored type: " + storedColumnDataType);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
index b4e828ed97..bb49dbfce2 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/combine/GroupByOrderByCombineOperator.java
@@ -85,7 +85,8 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator<GroupByRe
int minTrimSize = queryContext.getMinServerGroupTrimSize();
if (minTrimSize > 0) {
int limit = queryContext.getLimit();
- if (queryContext.getOrderByExpressions() != null || queryContext.getHavingFilter() != null) {
+ if ((!queryContext.isServerReturnFinalResult() && queryContext.getOrderByExpressions() != null)
+ || queryContext.getHavingFilter() != null) {
_trimSize = GroupByUtils.getTableCapacity(limit, minTrimSize);
} else {
// TODO: Keeping only 'LIMIT' groups can cause inaccurate result because the groups are randomly selected
@@ -252,7 +253,11 @@ public class GroupByOrderByCombineOperator extends BaseCombineOperator<GroupByRe
}
IndexedTable indexedTable = _indexedTable;
- indexedTable.finish(false);
+ if (!_queryContext.isServerReturnFinalResult()) {
+ indexedTable.finish(false);
+ } else {
+ indexedTable.finish(true, true);
+ }
GroupByResultsBlock mergedBlock = new GroupByResultsBlock(indexedTable);
mergedBlock.setNumGroupsLimitReached(_numGroupsLimitReached);
mergedBlock.setNumResizes(indexedTable.getNumResizes());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
index 6c31690929..21593418e9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionUtils.java
@@ -27,6 +27,8 @@ import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.common.BlockValSet;
import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -115,4 +117,48 @@ public class AggregationFunctionUtils {
BlockValSet blockValSet = transformBlock.getBlockValueSet(aggregationFunctionColumnPair.toColumnName());
return Collections.singletonMap(expression, blockValSet);
}
+
+ /**
+ * Reads the intermediate result from the {@link DataTable}.
+ */
+ public static Object getIntermediateResult(DataTable dataTable, ColumnDataType columnDataType, int rowId, int colId) {
+ switch (columnDataType) {
+ case LONG:
+ return dataTable.getLong(rowId, colId);
+ case DOUBLE:
+ return dataTable.getDouble(rowId, colId);
+ case OBJECT:
+ return dataTable.getObject(rowId, colId);
+ default:
+ throw new IllegalStateException("Illegal column data type in intermediate result: " + columnDataType);
+ }
+ }
+
+ /**
+ * Reads the converted final result from the {@link DataTable}. It should be equivalent to running
+ * {@link AggregationFunction#extractFinalResult(Object)} and {@link ColumnDataType#convert(Object)}.
+ */
+ public static Object getConvertedFinalResult(DataTable dataTable, ColumnDataType columnDataType, int rowId,
+ int colId) {
+ switch (columnDataType) {
+ case INT:
+ return dataTable.getInt(rowId, colId);
+ case LONG:
+ return dataTable.getLong(rowId, colId);
+ case FLOAT:
+ return dataTable.getFloat(rowId, colId);
+ case DOUBLE:
+ return dataTable.getDouble(rowId, colId);
+ case BIG_DECIMAL:
+ return dataTable.getBigDecimal(rowId, colId);
+ case STRING:
+ return dataTable.getString(rowId, colId);
+ case BYTES:
+ return dataTable.getBytes(rowId, colId).getBytes();
+ case DOUBLE_ARRAY:
+ return dataTable.getDoubleArray(rowId, colId);
+ default:
+ throw new IllegalStateException("Illegal column data type in final result: " + columnDataType);
+ }
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
index dd29a24c76..238fb557b1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/AggregationDataTableReducer.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.query.reduce;
+import com.google.common.base.Preconditions;
+import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import org.apache.pinot.common.metrics.BrokerMetrics;
@@ -27,6 +29,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.common.utils.DataTable;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.roaringbitmap.RoaringBitmap;
@@ -52,6 +55,8 @@ public class AggregationDataTableReducer implements DataTableReducer {
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+ assert dataSchema != null;
+
if (dataTableMap.isEmpty()) {
DataSchema resultTableSchema =
new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema()).getResultDataSchema();
@@ -59,43 +64,31 @@ public class AggregationDataTableReducer implements DataTableReducer {
return;
}
- // Merge results from all data tables
+ if (!_queryContext.isServerReturnFinalResult()) {
+ reduceWithIntermediateResult(dataSchema, dataTableMap.values(), brokerResponseNative);
+ } else {
+ Preconditions.checkState(dataTableMap.size() == 1, "Cannot merge final results from multiple servers");
+ reduceWithFinalResult(dataSchema, dataTableMap.values().iterator().next(), brokerResponseNative);
+ }
+ }
+
+ private void reduceWithIntermediateResult(DataSchema dataSchema, Collection<DataTable> dataTables,
+ BrokerResponseNative brokerResponseNative) {
int numAggregationFunctions = _aggregationFunctions.length;
Object[] intermediateResults = new Object[numAggregationFunctions];
- for (DataTable dataTable : dataTableMap.values()) {
+ for (DataTable dataTable : dataTables) {
for (int i = 0; i < numAggregationFunctions; i++) {
Object intermediateResultToMerge;
ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
if (_queryContext.isNullHandlingEnabled()) {
RoaringBitmap nullBitmap = dataTable.getNullRowIds(i);
- boolean isNull = nullBitmap != null && nullBitmap.contains(0);
- switch (columnDataType) {
- case LONG:
- intermediateResultToMerge = isNull ? null : dataTable.getLong(0, i);
- break;
- case DOUBLE:
- intermediateResultToMerge = isNull ? null : dataTable.getDouble(0, i);
- break;
- case OBJECT:
- intermediateResultToMerge = isNull ? null : dataTable.getObject(0, i);
- break;
- default:
- throw new IllegalStateException("Illegal column data type in aggregation results: " + columnDataType);
+ if (nullBitmap != null && nullBitmap.contains(0)) {
+ intermediateResultToMerge = null;
+ } else {
+ intermediateResultToMerge = AggregationFunctionUtils.getIntermediateResult(dataTable, columnDataType, 0, i);
}
} else {
- switch (columnDataType) {
- case LONG:
- intermediateResultToMerge = dataTable.getLong(0, i);
- break;
- case DOUBLE:
- intermediateResultToMerge = dataTable.getDouble(0, i);
- break;
- case OBJECT:
- intermediateResultToMerge = dataTable.getObject(0, i);
- break;
- default:
- throw new IllegalStateException("Illegal column data type in aggregation results: " + columnDataType);
- }
+ intermediateResultToMerge = AggregationFunctionUtils.getIntermediateResult(dataTable, columnDataType, 0, i);
}
Object mergedIntermediateResult = intermediateResults[i];
if (mergedIntermediateResult == null) {
@@ -114,6 +107,26 @@ public class AggregationDataTableReducer implements DataTableReducer {
brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
}
+ private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
+ BrokerResponseNative brokerResponseNative) {
+ int numAggregationFunctions = _aggregationFunctions.length;
+ Object[] finalResults = new Object[numAggregationFunctions];
+ for (int i = 0; i < numAggregationFunctions; i++) {
+ ColumnDataType columnDataType = dataSchema.getColumnDataType(i);
+ if (_queryContext.isNullHandlingEnabled()) {
+ RoaringBitmap nullBitmap = dataTable.getNullRowIds(i);
+ if (nullBitmap != null && nullBitmap.contains(0)) {
+ finalResults[i] = null;
+ } else {
+ finalResults[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, i);
+ }
+ } else {
+ finalResults[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataType, 0, i);
+ }
+ }
+ brokerResponseNative.setResultTable(reduceToResultTable(finalResults));
+ }
+
/**
* Sets aggregation results into ResultsTable
*/
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
index f167ca8a50..f04d5b9724 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GroupByDataTableReducer.java
@@ -18,6 +18,8 @@
*/
package org.apache.pinot.core.query.reduce;
+import com.google.common.base.Preconditions;
+import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -47,6 +49,7 @@ import org.apache.pinot.core.data.table.SimpleIndexedTable;
import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionUtils;
import org.apache.pinot.core.query.request.context.QueryContext;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.core.util.GroupByUtils;
@@ -84,19 +87,35 @@ public class GroupByDataTableReducer implements DataTableReducer {
*/
@Override
public void reduceAndSetResults(String tableName, DataSchema dataSchema,
- Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
+ Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponse,
DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
assert dataSchema != null;
- try {
- reduceToResultTable(brokerResponseNative, dataSchema, dataTableMap.values(), reducerContext, tableName,
- brokerMetrics);
- if (brokerMetrics != null) {
- brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE,
- brokerResponseNative.getResultTable().getRows().size());
+
+ if (dataTableMap.isEmpty()) {
+ PostAggregationHandler postAggregationHandler =
+ new PostAggregationHandler(_queryContext, getPrePostAggregationDataSchema(dataSchema));
+ DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+ brokerResponse.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
+ return;
+ }
+
+ if (!_queryContext.isServerReturnFinalResult()) {
+ try {
+ reduceWithIntermediateResult(brokerResponse, dataSchema, dataTableMap.values(), reducerContext, tableName,
+ brokerMetrics);
+ } catch (TimeoutException e) {
+ brokerResponse.getProcessingExceptions()
+ .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
}
- } catch (TimeoutException e) {
- brokerResponseNative.getProcessingExceptions()
- .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
+ } else {
+ // TODO: Support merging results from multiple servers when the data is partitioned on the group-by column
+ Preconditions.checkState(dataTableMap.size() == 1, "Cannot merge final results from multiple servers");
+ reduceWithFinalResult(dataSchema, dataTableMap.values().iterator().next(), brokerResponse);
+ }
+
+ if (brokerMetrics != null && brokerResponse.getResultTable() != null) {
+ brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE,
+ brokerResponse.getResultTable().getRows().size());
}
}
@@ -110,24 +129,17 @@ public class GroupByDataTableReducer implements DataTableReducer {
* @param brokerMetrics broker metrics (meters)
* @throws TimeoutException If unable complete within timeout.
*/
- private void reduceToResultTable(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+ private void reduceWithIntermediateResult(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
BrokerMetrics brokerMetrics)
throws TimeoutException {
- int numRecords;
- Iterator<Record> sortedIterator;
- if (!dataTables.isEmpty()) {
- IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
- if (brokerMetrics != null) {
- brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
- brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
- }
- numRecords = indexedTable.size();
- sortedIterator = indexedTable.iterator();
- } else {
- numRecords = 0;
- sortedIterator = Collections.emptyIterator();
+ IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
+ if (brokerMetrics != null) {
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+ brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
}
+ int numRecords = indexedTable.size();
+ Iterator<Record> sortedIterator = indexedTable.iterator();
DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
PostAggregationHandler postAggregationHandler =
@@ -148,8 +160,8 @@ public class GroupByDataTableReducer implements DataTableReducer {
FilterContext havingFilter = _queryContext.getHavingFilter();
if (havingFilter != null) {
rows = new ArrayList<>();
- HavingFilterHandler havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler,
- _queryContext.isNullHandlingEnabled());
+ HavingFilterHandler havingFilterHandler =
+ new HavingFilterHandler(havingFilter, postAggregationHandler, _queryContext.isNullHandlingEnabled());
while (rows.size() < limit && sortedIterator.hasNext()) {
Object[] row = sortedIterator.next().getValues();
extractFinalAggregationResults(row);
@@ -180,19 +192,7 @@ public class GroupByDataTableReducer implements DataTableReducer {
}
// Calculate final result rows after post aggregation
- List<Object[]> resultRows = new ArrayList<>(rows.size());
- ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
- int numResultColumns = resultColumnDataTypes.length;
- for (Object[] row : rows) {
- Object[] resultRow = postAggregationHandler.getResult(row);
- for (int i = 0; i < numResultColumns; i++) {
- Object value = resultRow[i];
- if (value != null) {
- resultRow[i] = resultColumnDataTypes[i].format(value);
- }
- }
- resultRows.add(resultRow);
- }
+ List<Object[]> resultRows = calculateFinalResultRows(postAggregationHandler, rows);
brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
}
@@ -376,4 +376,103 @@ public class GroupByDataTableReducer implements DataTableReducer {
return Math.min(numDataTables, maxReduceThreadsPerQuery);
}
}
+
+ private void reduceWithFinalResult(DataSchema dataSchema, DataTable dataTable,
+ BrokerResponseNative brokerResponseNative) {
+ PostAggregationHandler postAggregationHandler = new PostAggregationHandler(_queryContext, dataSchema);
+ DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+
+ // Directly return when there is no record returned, or limit is 0
+ int numRows = dataTable.getNumberOfRows();
+ int limit = _queryContext.getLimit();
+ if (numRows == 0 || limit == 0) {
+ brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, Collections.emptyList()));
+ return;
+ }
+
+ // Calculate rows before post-aggregation
+ List<Object[]> rows;
+ FilterContext havingFilter = _queryContext.getHavingFilter();
+ if (havingFilter != null) {
+ rows = new ArrayList<>();
+ HavingFilterHandler havingFilterHandler =
+ new HavingFilterHandler(havingFilter, postAggregationHandler, _queryContext.isNullHandlingEnabled());
+ for (int i = 0; i < numRows; i++) {
+ Object[] row = getConvertedRowWithFinalResult(dataTable, i);
+ if (havingFilterHandler.isMatch(row)) {
+ rows.add(row);
+ if (rows.size() == limit) {
+ break;
+ }
+ }
+ }
+ } else {
+ numRows = Math.min(numRows, limit);
+ rows = new ArrayList<>(numRows);
+ for (int i = 0; i < numRows; i++) {
+ rows.add(getConvertedRowWithFinalResult(dataTable, i));
+ }
+ }
+
+ // Calculate final result rows after post aggregation
+ List<Object[]> resultRows = calculateFinalResultRows(postAggregationHandler, rows);
+
+ brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, resultRows));
+ }
+
+ private List<Object[]> calculateFinalResultRows(PostAggregationHandler postAggregationHandler, List<Object[]> rows) {
+ List<Object[]> resultRows = new ArrayList<>(rows.size());
+ ColumnDataType[] resultColumnDataTypes = postAggregationHandler.getResultDataSchema().getColumnDataTypes();
+ int numResultColumns = resultColumnDataTypes.length;
+ for (Object[] row : rows) {
+ Object[] resultRow = postAggregationHandler.getResult(row);
+ for (int i = 0; i < numResultColumns; i++) {
+ Object value = resultRow[i];
+ if (value != null) {
+ resultRow[i] = resultColumnDataTypes[i].format(value);
+ }
+ }
+ resultRows.add(resultRow);
+ }
+ return resultRows;
+ }
+
+ private Object[] getConvertedRowWithFinalResult(DataTable dataTable, int rowId) {
+ Object[] row = new Object[_numColumns];
+ ColumnDataType[] columnDataTypes = dataTable.getDataSchema().getColumnDataTypes();
+ for (int i = 0; i < _numColumns; i++) {
+ if (i < _numGroupByExpressions) {
+ row[i] = getConvertedKey(dataTable, columnDataTypes[i], rowId, i);
+ } else {
+ row[i] = AggregationFunctionUtils.getConvertedFinalResult(dataTable, columnDataTypes[i], rowId, i);
+ }
+ }
+ return row;
+ }
+
+ private Object getConvertedKey(DataTable dataTable, ColumnDataType columnDataType, int rowId, int colId) {
+ switch (columnDataType) {
+ case INT:
+ return dataTable.getInt(rowId, colId);
+ case LONG:
+ return dataTable.getLong(rowId, colId);
+ case FLOAT:
+ return dataTable.getFloat(rowId, colId);
+ case DOUBLE:
+ return dataTable.getDouble(rowId, colId);
+ case BIG_DECIMAL:
+ return dataTable.getBigDecimal(rowId, colId);
+ case BOOLEAN:
+ return dataTable.getInt(rowId, colId) == 1;
+ case TIMESTAMP:
+ return new Timestamp(dataTable.getLong(rowId, colId));
+ case STRING:
+ case JSON:
+ return dataTable.getString(rowId, colId);
+ case BYTES:
+ return dataTable.getBytes(rowId, colId).getBytes();
+ default:
+ throw new IllegalStateException("Illegal column data type in group key: " + columnDataType);
+ }
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
index 2faeefc15e..dbf8d64fbb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
@@ -120,6 +120,8 @@ public class QueryContext {
private int _groupTrimThreshold = InstancePlanMakerImplV2.DEFAULT_GROUPBY_TRIM_THRESHOLD;
// Whether null handling is enabled
private boolean _nullHandlingEnabled;
+ // Whether server returns the final result
+ private boolean _serverReturnFinalResult;
private QueryContext(@Nullable String tableName, @Nullable QueryContext subquery,
List<ExpressionContext> selectExpressions, List<String> aliasList, @Nullable FilterContext filter,
@@ -383,6 +385,14 @@ public class QueryContext {
_nullHandlingEnabled = nullHandlingEnabled;
}
+ public boolean isServerReturnFinalResult() {
+ return _serverReturnFinalResult;
+ }
+
+ public void setServerReturnFinalResult(boolean serverReturnFinalResult) {
+ _serverReturnFinalResult = serverReturnFinalResult;
+ }
+
/**
* Gets or computes a value of type {@code V} associated with a key of type {@code K} so that it can be shared
* within the scope of a query.
@@ -500,6 +510,7 @@ public class QueryContext {
new QueryContext(_tableName, _subquery, _selectExpressions, _aliasList, _filter, _groupByExpressions,
_havingFilter, _orderByExpressions, _limit, _offset, _queryOptions, _expressionOverrideHints, _explain);
queryContext.setNullHandlingEnabled(QueryOptionsUtils.isNullHandlingEnabled(_queryOptions));
+ queryContext.setServerReturnFinalResult(QueryOptionsUtils.isServerReturnFinalResult(_queryOptions));
// Pre-calculate the aggregation functions and columns for the query
generateAggregationFunctions(queryContext);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
index 0ac77c10e9..dc40e20a88 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/QueryOptionsUtils.java
@@ -22,7 +22,8 @@ import com.google.common.base.Preconditions;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.core.common.datatable.DataTableFactory;
-import org.apache.pinot.spi.utils.CommonConstants.Broker.Request;
+import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionKey;
+import org.apache.pinot.spi.utils.CommonConstants.Broker.Request.QueryOptionValue;
/**
@@ -34,7 +35,7 @@ public class QueryOptionsUtils {
@Nullable
public static Long getTimeoutMs(Map<String, String> queryOptions) {
- String timeoutMsString = queryOptions.get(Request.QueryOptionKey.TIMEOUT_MS);
+ String timeoutMsString = queryOptions.get(QueryOptionKey.TIMEOUT_MS);
if (timeoutMsString != null) {
long timeoutMs = Long.parseLong(timeoutMsString);
Preconditions.checkState(timeoutMs > 0, "Query timeout must be positive, got: %s", timeoutMs);
@@ -45,56 +46,60 @@ public class QueryOptionsUtils {
}
public static boolean isSkipUpsert(Map<String, String> queryOptions) {
- return Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.SKIP_UPSERT));
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UPSERT));
}
public static boolean isSkipStarTree(Map<String, String> queryOptions) {
- return "false".equalsIgnoreCase(queryOptions.get(Request.QueryOptionKey.USE_STAR_TREE));
+ return "false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_STAR_TREE));
}
public static boolean isRoutingForceHLC(Map<String, String> queryOptions) {
- String routingOptions = queryOptions.get(Request.QueryOptionKey.ROUTING_OPTIONS);
- return routingOptions != null && routingOptions.toUpperCase().contains(Request.QueryOptionValue.ROUTING_FORCE_HLC);
+ String routingOptions = queryOptions.get(QueryOptionKey.ROUTING_OPTIONS);
+ return routingOptions != null && routingOptions.toUpperCase().contains(QueryOptionValue.ROUTING_FORCE_HLC);
}
public static boolean isSkipScanFilterReorder(Map<String, String> queryOptions) {
- return "false".equalsIgnoreCase(queryOptions.get(Request.QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
+ return "false".equalsIgnoreCase(queryOptions.get(QueryOptionKey.USE_SCAN_REORDER_OPTIMIZATION));
}
@Nullable
public static Integer getNumReplicaGroupsToQuery(Map<String, String> queryOptions) {
- String numReplicaGroupsToQuery = queryOptions.get(Request.QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
+ String numReplicaGroupsToQuery = queryOptions.get(QueryOptionKey.NUM_REPLICA_GROUPS_TO_QUERY);
return numReplicaGroupsToQuery != null ? Integer.parseInt(numReplicaGroupsToQuery) : null;
}
public static boolean isExplainPlanVerbose(Map<String, String> queryOptions) {
- return Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.EXPLAIN_PLAN_VERBOSE));
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.EXPLAIN_PLAN_VERBOSE));
}
@Nullable
public static Integer getMaxExecutionThreads(Map<String, String> queryOptions) {
- String maxExecutionThreadsString = queryOptions.get(Request.QueryOptionKey.MAX_EXECUTION_THREADS);
+ String maxExecutionThreadsString = queryOptions.get(QueryOptionKey.MAX_EXECUTION_THREADS);
return maxExecutionThreadsString != null ? Integer.parseInt(maxExecutionThreadsString) : null;
}
@Nullable
public static Integer getMinSegmentGroupTrimSize(Map<String, String> queryOptions) {
- String minSegmentGroupTrimSizeString = queryOptions.get(Request.QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
+ String minSegmentGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SEGMENT_GROUP_TRIM_SIZE);
return minSegmentGroupTrimSizeString != null ? Integer.parseInt(minSegmentGroupTrimSizeString) : null;
}
@Nullable
public static Integer getMinServerGroupTrimSize(Map<String, String> queryOptions) {
- String minServerGroupTrimSizeString = queryOptions.get(Request.QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE);
+ String minServerGroupTrimSizeString = queryOptions.get(QueryOptionKey.MIN_SERVER_GROUP_TRIM_SIZE);
return minServerGroupTrimSizeString != null ? Integer.parseInt(minServerGroupTrimSizeString) : null;
}
public static boolean isNullHandlingEnabled(Map<String, String> queryOptions) {
- boolean nullHandlingEnabled = Boolean.parseBoolean(queryOptions.get(Request.QueryOptionKey.ENABLE_NULL_HANDLING));
+ boolean nullHandlingEnabled = Boolean.parseBoolean(queryOptions.get(QueryOptionKey.ENABLE_NULL_HANDLING));
if (nullHandlingEnabled) {
Preconditions.checkState(DataTableFactory.getDataTableVersion() >= DataTableFactory.VERSION_4,
"Null handling cannot be enabled for data table version smaller than 4");
}
return nullHandlingEnabled;
}
+
+ public static boolean isServerReturnFinalResult(Map<String, String> queryOptions) {
+ return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SERVER_RETURN_FINAL_RESULT));
+ }
}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 79e1c7c59f..962fd84a2b 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -88,7 +88,6 @@ import static org.apache.pinot.common.function.scalar.StringFunctions.*;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_CHECK_INTERVAL_MS;
import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.EXTERNAL_VIEW_ONLINE_SEGMENTS_MAX_WAIT_MS;
import static org.testng.Assert.*;
-import static org.testng.Assert.assertEquals;
/**
@@ -260,6 +259,15 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
}
+ @Override
+ protected void testQuery(String pinotQuery, String h2Query)
+ throws Exception {
+ if (getNumServers() == 1) {
+ pinotQuery = "SET serverReturnFinalResult = true;" + pinotQuery;
+ }
+ super.testQuery(pinotQuery, h2Query);
+ }
+
@Test
public void testInstancesStarted() {
assertEquals(_serviceStatusCallbacks.size(), getNumBrokers() + getNumServers());
@@ -554,8 +562,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
TableConfig tableConfig = getOfflineTableConfig();
tableConfig.getIndexingConfig().setInvertedIndexColumns(UPDATED_INVERTED_INDEX_COLUMNS);
updateTableConfig(tableConfig);
- String reloadJobId = reloadOfflineTableAndValidateResponse(
- TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), false);
+ String reloadJobId =
+ reloadOfflineTableAndValidateResponse(TableNameBuilder.OFFLINE.tableNameWithType(getTableName()), false);
// It takes a while to reload multiple segments, thus we retry the query for some time.
// After all segments are reloaded, the inverted index is added on DivActualElapsedTime.
@@ -592,7 +600,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
@Test
- public void testRegexpReplace() throws Exception {
+ public void testRegexpReplace()
+ throws Exception {
// Correctness tests of regexpReplace.
// Test replace all.
@@ -643,7 +652,6 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
result = response.get("resultTable").get("rows").get(0).get(0).asText();
assertEquals(result, "hsomething, something, something and wise");
-
// Test occurence
sqlQuery = "SELECT regexpReplace('healthy, wealthy, stealthy and wise','\\w+thy', 'something', 0, 2)";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
@@ -685,8 +693,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
JsonNode rows = response.get("resultTable").get("rows");
for (int i = 0; i < rows.size(); i++) {
JsonNode row = rows.get(i);
- boolean containsSpace = row.get(0).asText().contains(" ");
- assertEquals(containsSpace, false);
+ assertFalse(row.get(0).asText().contains(" "));
}
// Test in where clause
@@ -699,8 +706,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(count1, count2);
// Test nested transform
- sqlQuery = "SELECT count(*) from myTable where contains(regexpReplace(originState, '(C)(A)', '$1TEST$2'), "
- + "'CTESTA')";
+ sqlQuery =
+ "SELECT count(*) from myTable where contains(regexpReplace(originState, '(C)(A)', '$1TEST$2'), 'CTESTA')";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
count1 = response.get("resultTable").get("rows").get(0).get(0).asInt();
sqlQuery = "SELECT count(*) from myTable where originState='CA'";
@@ -810,8 +817,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
// long string literal encode
sqlQuery =
"SELECT toBase64(toUtf8('this is a long string that will encode to more than 76 characters using base64')) "
- + "FROM "
- + "myTable";
+ + "FROM myTable";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
@@ -842,9 +848,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(encodedString, toBase64(toUtf8("123")));
// identifier
- sqlQuery =
- "SELECT Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))), fromBase64"
- + "(toBase64(toUtf8(Carrier))) FROM myTable LIMIT 100";
+ sqlQuery = "SELECT Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))), "
+ + "fromBase64(toBase64(toUtf8(Carrier))) FROM myTable LIMIT 100";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
@@ -881,9 +886,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertTrue(response.get("exceptions").get(0).get("message").toString().contains("IllegalArgumentException"));
// string literal used in a filter
- sqlQuery =
- "SELECT * FROM myTable WHERE fromUtf8(fromBase64('aGVsbG8h')) != Carrier AND toBase64(toUtf8('hello!')) != "
- + "Carrier LIMIT 10";
+ sqlQuery = "SELECT * FROM myTable WHERE fromUtf8(fromBase64('aGVsbG8h')) != Carrier AND "
+ + "toBase64(toUtf8('hello!')) != Carrier LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
rows = resultTable.get("rows");
@@ -904,10 +908,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(rows.size(), 10);
// non-string identifier used in a filter
- sqlQuery =
- "SELECT fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))), AirlineID FROM myTable WHERE fromUtf8(fromBase64"
- + "(toBase64(toUtf8(AirlineID)))) = "
- + "AirlineID LIMIT 10";
+ sqlQuery = "SELECT fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))), AirlineID FROM myTable WHERE "
+ + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) = AirlineID LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
@@ -916,12 +918,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
assertEquals(rows.size(), 10);
// string identifier used in group by order by
- sqlQuery =
- "SELECT Carrier as originalCol, toBase64(toUtf8(Carrier)) as encoded, fromUtf8(fromBase64(toBase64(toUtf8"
- + "(Carrier)))) as decoded "
- + "FROM myTable GROUP BY Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)"
- + "))) ORDER BY toBase64(toUtf8(Carrier))"
- + " LIMIT 10";
+ sqlQuery = "SELECT Carrier as originalCol, toBase64(toUtf8(Carrier)) as encoded, "
+ + "fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) as decoded FROM myTable "
+ + "GROUP BY Carrier, toBase64(toUtf8(Carrier)), fromUtf8(fromBase64(toBase64(toUtf8(Carrier)))) "
+ + "ORDER BY toBase64(toUtf8(Carrier)) LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
@@ -938,12 +938,10 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
}
// non-string identifier used in group by order by
- sqlQuery =
- "SELECT AirlineID as originalCol, toBase64(toUtf8(AirlineID)) as encoded, fromUtf8(fromBase64(toBase64(toUtf8"
- + "(AirlineID)))) as decoded "
- + "FROM myTable GROUP BY AirlineID, toBase64(toUtf8(AirlineID)), fromUtf8(fromBase64(toBase64(toUtf8"
- + "(AirlineID)))) ORDER BY "
- + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) LIMIT 10";
+ sqlQuery = "SELECT AirlineID as originalCol, toBase64(toUtf8(AirlineID)) as encoded, "
+ + "fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) as decoded FROM myTable "
+ + "GROUP BY AirlineID, toBase64(toUtf8(AirlineID)), fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) "
+ + "ORDER BY fromUtf8(fromBase64(toBase64(toUtf8(AirlineID)))) LIMIT 10";
response = postQuery(sqlQuery, _brokerBaseApiUrl);
resultTable = response.get("resultTable");
dataSchema = resultTable.get("dataSchema");
@@ -1024,10 +1022,8 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
"key1%3Dvalue+1%26key2%3Dvalue%40%21%242%26key3%3Dvalue%253");
assertEquals(response.get("resultTable").get("rows").get(0).get(8).asText(),
"key1=value 1&key2=value@!$2&key3=value%3");
- assertEquals(response.get("resultTable").get("rows").get(0).get(9).asText(),
- "aGVsbG8h");
- assertEquals(response.get("resultTable").get("rows").get(0).get(10).asText(),
- "hello!");
+ assertEquals(response.get("resultTable").get("rows").get(0).get(9).asText(), "aGVsbG8h");
+ assertEquals(response.get("resultTable").get("rows").get(0).get(10).asText(), "hello!");
}
@Test(dependsOnMethods = "testBloomFilterTriggering")
@@ -2769,8 +2765,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
throws IOException {
String jobId = null;
String response =
- sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload),
- null);
+ sendPostRequest(_controllerRequestURLBuilder.forTableReload(tableName, TableType.OFFLINE, forceDownload), null);
String tableNameWithType = TableNameBuilder.OFFLINE.tableNameWithType(tableName);
JsonNode tableLevelDetails =
JsonUtils.stringToJsonNode(StringEscapeUtils.unescapeJava(response.split(": ")[1])).get(tableNameWithType);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 738800c6cb..2ca5801b1a 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -283,6 +283,7 @@ public class CommonConstants {
public static final String EXPLAIN_PLAN_VERBOSE = "explainPlanVerbose";
public static final String USE_MULTISTAGE_ENGINE = "useMultistageEngine";
public static final String ENABLE_NULL_HANDLING = "enableNullHandling";
+ public static final String SERVER_RETURN_FINAL_RESULT = "serverReturnFinalResult";
// TODO: Remove these keys (only apply to PQL) after releasing 0.11.0
@Deprecated
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org