You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/03 01:01:11 UTC
[pinot] branch master updated: Add Post-Aggregation Gapfilling functionality. (#7781)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5b9b836 Add Post-Aggregation Gapfilling functionality. (#7781)
5b9b836 is described below
commit 5b9b8364ef546113ff548b41504cbb0dc2aa2eda
Author: weixiangsun <91...@users.noreply.github.com>
AuthorDate: Thu Dec 2 17:00:51 2021 -0800
Add Post-Aggregation Gapfilling functionality. (#7781)
---
.../core/operator/transform/TransformOperator.java | 4 +-
.../reduce/GapFillGroupByDataTableReducer.java | 491 ++++++++++++++++
.../core/query/reduce/PostAggregationHandler.java | 4 +-
.../core/query/reduce/ResultReducerFactory.java | 3 +
.../org/apache/pinot/core/util/GapfillUtils.java | 122 ++++
.../queries/PostAggregationGapfillQueriesTest.java | 616 +++++++++++++++++++++
6 files changed, 1238 insertions(+), 2 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index d3b752e..08bfeb8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.blocks.TransformBlock;
import org.apache.pinot.core.operator.transform.function.TransformFunction;
import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.index.reader.Dictionary;
@@ -62,7 +63,8 @@ public class TransformOperator extends BaseOperator<TransformBlock> {
_projectionOperator = projectionOperator;
_dataSourceMap = projectionOperator.getDataSourceMap();
for (ExpressionContext expression : expressions) {
- TransformFunction transformFunction = TransformFunctionFactory.get(queryContext, expression, _dataSourceMap);
+ TransformFunction transformFunction =
+ TransformFunctionFactory.get(queryContext, GapfillUtils.stripGapfill(expression), _dataSourceMap);
_transformFunctionMap.put(expression, transformFunction);
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
new file mode 100644
index 0000000..9f739b0
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+ private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value.
+
+ private final QueryContext _queryContext;
+ private final AggregationFunction[] _aggregationFunctions;
+ private final int _numAggregationFunctions;
+ private final List<ExpressionContext> _groupByExpressions;
+ private final int _numGroupByExpressions;
+ private final int _numColumns;
+ private final DateTimeGranularitySpec _dateTimeGranularity;
+ private final DateTimeFormatSpec _dateTimeFormatter;
+ private final long _startMs;
+ private final long _endMs;
+ private final Set<Key> _groupByKeys;
+ private final Map<Key, Object[]> _previousByGroupKey;
+ private final int _numOfGroupByKeys;
+ private final List<Integer> _groupByKeyIndexes;
+ private final boolean [] _isGroupBySelections;
+ private int _timeBucketIndex = -1;
+
+ GapFillGroupByDataTableReducer(QueryContext queryContext) {
+ Preconditions.checkArgument(
+ queryContext.getBrokerRequest().getPinotQuery() != null, "GapFill can only be applied to sql query");
+ _queryContext = queryContext;
+ _aggregationFunctions = queryContext.getAggregationFunctions();
+ assert _aggregationFunctions != null;
+ _numAggregationFunctions = _aggregationFunctions.length;
+ _groupByExpressions = queryContext.getGroupByExpressions();
+ assert _groupByExpressions != null;
+ _numGroupByExpressions = _groupByExpressions.size();
+ _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+
+ ExpressionContext gapFillSelection = null;
+ for (ExpressionContext expressionContext : _queryContext.getSelectExpressions()) {
+ if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
+ gapFillSelection = expressionContext;
+ break;
+ }
+ }
+
+ List<ExpressionContext> args = gapFillSelection.getFunction().getArguments();
+ Preconditions.checkArgument(
+ args.size() == 5, "PostAggregateGapFill does not have correct number of arguments.");
+ Preconditions.checkArgument(
+ args.get(1).getLiteral() != null, "The second argument of PostAggregateGapFill should be TimeFormatter.");
+ Preconditions.checkArgument(
+ args.get(2).getLiteral() != null, "The third argument of PostAggregateGapFill should be start time.");
+ Preconditions.checkArgument(
+ args.get(3).getLiteral() != null, "The fourth argument of PostAggregateGapFill should be end time.");
+ Preconditions.checkArgument(
+ args.get(4).getLiteral() != null, "The fifth argument of PostAggregateGapFill should be time bucket size.");
+
+ boolean orderByTimeBucket = false;
+ if (_queryContext.getOrderByExpressions() != null && !_queryContext.getOrderByExpressions().isEmpty()) {
+ OrderByExpressionContext firstOrderByExpression = _queryContext.getOrderByExpressions().get(0);
+ orderByTimeBucket =
+ firstOrderByExpression.isAsc() && firstOrderByExpression.getExpression().equals(gapFillSelection);
+ }
+
+ Preconditions.checkArgument(
+ orderByTimeBucket, "PostAggregateGapFill does not work if the time bucket is not ordered.");
+
+ _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+ _dateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral());
+ String start = args.get(2).getLiteral();
+ String end = args.get(3).getLiteral();
+ _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+ _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+ _groupByKeys = new HashSet<>();
+ _previousByGroupKey = new HashMap<>();
+ _numOfGroupByKeys = _queryContext.getGroupByExpressions().size() - 1;
+ _groupByKeyIndexes = new ArrayList<>();
+ _isGroupBySelections = new boolean[_queryContext.getSelectExpressions().size()];
+
+ for (ExpressionContext expressionContext : _groupByExpressions) {
+ if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
+ for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
+ if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
+ _timeBucketIndex = i;
+ _isGroupBySelections[i] = true;
+ break;
+ }
+ }
+ } else {
+ for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
+ if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
+ _groupByKeyIndexes.add(i);
+ _isGroupBySelections[i] = true;
+ break;
+ }
+ }
+ }
+ }
+
+ Preconditions.checkArgument(_timeBucketIndex >= 0, "There is no time bucket.");
+ }
+
+ private long truncate(long epoch) {
+ int sz = _dateTimeGranularity.getSize();
+ return epoch / sz * sz;
+ }
+
+ /**
+ * Reduces and sets group by results into ResultTable, if responseFormat = sql
+ * By default, sets group by results into GroupByResults
+ */
+ @Override
+ public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+ Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
+ DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+ assert dataSchema != null;
+ Collection<DataTable> dataTables = dataTableMap.values();
+
+ try {
+ setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, reducerContext, tableName,
+ brokerMetrics);
+ } catch (TimeoutException e) {
+ brokerResponseNative.getProcessingExceptions()
+ .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
+ }
+ int resultSize = brokerResponseNative.getResultTable().getRows().size();
+
+ if (brokerMetrics != null && resultSize > 0) {
+ brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, resultSize);
+ }
+ }
+
+ private Key constructGroupKeys(Object[] row) {
+ Object [] groupKeys = new Object[_numOfGroupByKeys];
+ for (int i = 0; i < _numOfGroupByKeys; i++) {
+ groupKeys[i] = row[_groupByKeyIndexes.get(i)];
+ }
+ return new Key(groupKeys);
+ }
+
+ /**
+ * Extract group by order by results and set into {@link ResultTable}
+ * @param brokerResponseNative broker response
+ * @param dataSchema data schema
+ * @param dataTables Collection of data tables
+ * @param reducerContext DataTableReducer context
+ * @param rawTableName table name
+ * @param brokerMetrics broker metrics (meters)
+ * @throws TimeoutException If unable complete within timeout.
+ */
+ private void setSQLGroupByInResultTable(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+ Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
+ BrokerMetrics brokerMetrics)
+ throws TimeoutException {
+ IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
+ if (brokerMetrics != null) {
+ brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+ brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+ }
+ DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
+ ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
+
+ PostAggregationHandler postAggregationHandler =
+ new PostAggregationHandler(_queryContext, prePostAggregationDataSchema);
+ DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+ ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
+ Iterator<Record> sortedIterator = indexedTable.iterator();
+ while (sortedIterator.hasNext()) {
+ Object[] row = sortedIterator.next().getValues();
+ extractFinalAggregationResults(row);
+ for (int i = 0; i < columnDataTypes.length; i++) {
+ row[i] = columnDataTypes[i].convert(row[i]);
+ }
+ Object[] resultRow = postAggregationHandler.getResult(row);
+ for (int i = 0; i < resultColumnDataTypes.length; i++) {
+ resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+ }
+
+ _groupByKeys.add(constructGroupKeys(resultRow));
+ }
+
+ List<Object[]> gapfillResultRows = gapFill(indexedTable.iterator(), postAggregationHandler);
+ brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, gapfillResultRows));
+ }
+
+ List<Object[]> gapFill(Iterator<Record> sortedIterator, PostAggregationHandler postAggregationHandler) {
+ DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+ ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
+ int limit = _queryContext.getLimit();
+ int numResultColumns = resultColumnDataTypes.length;
+ List<Object[]> gapfillResultRows = new ArrayList<>(limit);
+ long step = _dateTimeGranularity.granularityToMillis();
+ FilterContext havingFilter = _queryContext.getHavingFilter();
+ HavingFilterHandler havingFilterHandler = null;
+ if (havingFilter != null) {
+ havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler);
+ }
+ Record record = null;
+ for (long time = _startMs; time + 2 * step <= _endMs; time += step) {
+ Set<Key> keys = new HashSet<>(_groupByKeys);
+ if (record == null && sortedIterator.hasNext()) {
+ record = sortedIterator.next();
+ }
+
+ while (record != null) {
+ Object[] row = record.getValues();
+
+ Object[] resultRow = postAggregationHandler.getResult(row);
+ for (int i = 0; i < resultColumnDataTypes.length; i++) {
+ resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+ }
+
+ long timeCol = _dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[_timeBucketIndex]));
+ if (timeCol > time) {
+ break;
+ }
+ if (timeCol == time) {
+ if (havingFilterHandler == null || havingFilterHandler.isMatch(row)) {
+ gapfillResultRows.add(resultRow);
+ if (gapfillResultRows.size() == limit) {
+ return gapfillResultRows;
+ }
+ }
+ Key key = constructGroupKeys(resultRow);
+ keys.remove(key);
+ _previousByGroupKey.put(key, resultRow);
+ }
+ if (sortedIterator.hasNext()) {
+ record = sortedIterator.next();
+ } else {
+ record = null;
+ }
+ }
+
+ for (Key key : keys) {
+ Object[] gapfillRow = new Object[numResultColumns];
+ int keyIndex = 0;
+ for (int i = 0; i < _isGroupBySelections.length; i++) {
+ if (_isGroupBySelections[i]) {
+ if (i == _timeBucketIndex) {
+ if (resultColumnDataTypes[i] == ColumnDataType.LONG) {
+ gapfillRow[_timeBucketIndex] = Long.valueOf(_dateTimeFormatter.fromMillisToFormat(time));
+ } else {
+ gapfillRow[_timeBucketIndex] = _dateTimeFormatter.fromMillisToFormat(time);
+ }
+ } else {
+ gapfillRow[i] = key.getValues()[keyIndex++];
+ }
+ } else {
+ gapfillRow[i] = getFillValue(i, key, resultColumnDataTypes[i]);
+ }
+ }
+
+ if (havingFilterHandler == null || havingFilterHandler.isMatch(gapfillRow)) {
+ gapfillResultRows.add(gapfillRow);
+ if (gapfillResultRows.size() == limit) {
+ return gapfillResultRows;
+ }
+ }
+ }
+ }
+ return gapfillResultRows;
+ }
+
+ Object getFillValue(int columIndex, Object key, ColumnDataType dataType) {
+ ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(columIndex);
+ if (expressionContext.getFunction() != null && GapfillUtils.isFill(expressionContext)) {
+ List<ExpressionContext> args = expressionContext.getFunction().getArguments();
+ if (args.get(1).getLiteral() == null) {
+ throw new UnsupportedOperationException("Wrong Sql.");
+ }
+ GapfillUtils.FillType fillType = GapfillUtils.FillType.valueOf(args.get(1).getLiteral());
+ if (fillType == GapfillUtils.FillType.FILL_DEFAULT_VALUE) {
+ // TODO: may fill the default value from sql in the future.
+ return GapfillUtils.getDefaultValue(dataType);
+ } else if (fillType == GapfillUtils.FillType.FILL_PREVIOUS_VALUE) {
+ Object[] row = _previousByGroupKey.get(key);
+ if (row != null) {
+ return row[columIndex];
+ } else {
+ return GapfillUtils.getDefaultValue(dataType);
+ }
+ } else {
+ throw new UnsupportedOperationException("unsupported fill type.");
+ }
+ } else {
+ return GapfillUtils.getDefaultValue(dataType);
+ }
+ }
+
+ /**
+ * Helper method to extract the final aggregation results for the given row (in-place).
+ */
+ private void extractFinalAggregationResults(Object[] row) {
+ for (int i = 0; i < _numAggregationFunctions; i++) {
+ int valueIndex = i + _numGroupByExpressions;
+ row[valueIndex] = _aggregationFunctions[i].extractFinalResult(row[valueIndex]);
+ }
+ }
+
+ /**
+ * Constructs the DataSchema for the rows before the post-aggregation (SQL mode).
+ */
+ private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
+ String[] columnNames = dataSchema.getColumnNames();
+ ColumnDataType[] columnDataTypes = new ColumnDataType[_numColumns];
+ System.arraycopy(dataSchema.getColumnDataTypes(), 0, columnDataTypes, 0, _numGroupByExpressions);
+ for (int i = 0; i < _numAggregationFunctions; i++) {
+ columnDataTypes[i + _numGroupByExpressions] = _aggregationFunctions[i].getFinalResultColumnType();
+ }
+ return new DataSchema(columnNames, columnDataTypes);
+ }
+
+ private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce,
+ DataTableReducerContext reducerContext)
+ throws TimeoutException {
+ long start = System.currentTimeMillis();
+ int numDataTables = dataTablesToReduce.size();
+
+ // Get the number of threads to use for reducing.
+ // In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls.
+ int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());
+ int limit = _queryContext.getLimit();
+ // TODO: Make minTrimSize configurable
+ int trimSize = GroupByUtils.getTableCapacity(limit);
+ // NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy.
+ // TODO: Resolve the HAVING clause within the IndexedTable before returning the result
+ int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
+ int trimThreshold = reducerContext.getGroupByTrimThreshold();
+ IndexedTable indexedTable;
+ if (numReduceThreadsToUse <= 1) {
+ indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
+ } else {
+ if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
+ // special case of trim threshold where it is set to max value.
+ // there won't be any trimming during upsert in this case.
+ // thus we can avoid the overhead of read-lock and write-lock
+ // in the upsert method.
+ indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, resultSize);
+ } else {
+ indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
+ }
+ }
+
+ // Create groups of data tables that each thread can process concurrently.
+ // Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
+ ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+ List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse);
+
+ for (int i = 0; i < numReduceThreadsToUse; i++) {
+ reduceGroups.add(new ArrayList<>());
+ }
+ for (int i = 0; i < numDataTables; i++) {
+ reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+ }
+
+ ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+ long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
+ try {
+ reducerContext.getExecutorService().invokeAll(reduceGroups.stream()
+ .map(reduceGroup -> new TraceCallable<Void>() {
+ @Override
+ public Void callJob() throws Exception {
+ for (DataTable dataTable : reduceGroup) {
+ int numRows = dataTable.getNumberOfRows();
+ for (int rowId = 0; rowId < numRows; rowId++) {
+ Object[] values = new Object[_numColumns];
+ for (int colId = 0; colId < _numColumns; colId++) {
+ switch (storedColumnDataTypes[colId]) {
+ case INT:
+ values[colId] = dataTable.getInt(rowId, colId);
+ break;
+ case LONG:
+ values[colId] = dataTable.getLong(rowId, colId);
+ break;
+ case FLOAT:
+ values[colId] = dataTable.getFloat(rowId, colId);
+ break;
+ case DOUBLE:
+ values[colId] = dataTable.getDouble(rowId, colId);
+ break;
+ case STRING:
+ values[colId] = dataTable.getString(rowId, colId);
+ break;
+ case BYTES:
+ values[colId] = dataTable.getBytes(rowId, colId);
+ break;
+ case OBJECT:
+ values[colId] = dataTable.getObject(rowId, colId);
+ break;
+ // Add other aggregation intermediate result / group-by column type supports here
+ default:
+ throw new IllegalStateException();
+ }
+ }
+ indexedTable.upsert(new Record(values));
+ }
+ }
+ return null;
+ }
+ }).collect(Collectors.toList()), timeOutMs, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ throw new TimeoutException("Timed out in broker reduce phase.");
+ }
+
+ indexedTable.finish(true);
+ return indexedTable;
+ }
+
+ /**
+ * Computes the number of reduce threads to use per query.
+ * <ul>
+ * <li> Use single thread if number of data tables to reduce is less than
+ * {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
+ * <li> Else, use min of max allowed reduce threads per query, and number of data tables.</li>
+ * </ul>
+ *
+ * @param numDataTables Number of data tables to reduce
+ * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
+ * @return Number of reduce threads to use for the query
+ */
+ private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) {
+ // Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
+ if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
+ return Math.min(1, numDataTables); // Number of data tables can be zero.
+ }
+
+ return Math.min(maxReduceThreadsPerQuery, numDataTables);
+ }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
index e190e2f..4705951 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
@@ -28,6 +28,7 @@ import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
import org.apache.pinot.core.query.postaggregation.PostAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
/**
@@ -50,7 +51,7 @@ public class PostAggregationHandler {
_numGroupByExpressions = groupByExpressions.size();
_groupByExpressionIndexMap = new HashMap<>();
for (int i = 0; i < _numGroupByExpressions; i++) {
- _groupByExpressionIndexMap.put(groupByExpressions.get(i), i);
+ _groupByExpressionIndexMap.put(GapfillUtils.stripGapfill(groupByExpressions.get(i)), i);
}
} else {
_numGroupByExpressions = 0;
@@ -98,6 +99,7 @@ public class PostAggregationHandler {
* Returns a ValueExtractor based on the given expression.
*/
public ValueExtractor getValueExtractor(ExpressionContext expression) {
+ expression = GapfillUtils.stripGapfill(expression);
if (expression.getType() == ExpressionContext.Type.LITERAL) {
// Literal
return new LiteralValueExtractor(expression.getLiteral());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index 7dcf05d..e5e9bf8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -22,6 +22,7 @@ import org.apache.pinot.common.request.BrokerRequest;
import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
import org.apache.pinot.segment.spi.AggregationFunctionType;
@@ -56,6 +57,8 @@ public final class ResultReducerFactory {
} else {
return new AggregationDataTableReducer(queryContext);
}
+ } else if (GapfillUtils.isPostAggregateGapfill(queryContext)) {
+ return new GapFillGroupByDataTableReducer(queryContext);
} else {
// Aggregation group-by query
return new GroupByDataTableReducer(queryContext);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
new file mode 100644
index 0000000..55b618c
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.io.Serializable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Util class to encapsulate all utilites required for gapfill.
+ */
+public class GapfillUtils {
+ private static final String POST_AGGREGATE_GAP_FILL = "postaggregategapfill";
+ private static final String FILL = "fill";
+
+ private GapfillUtils() {
+ }
+
+ public static ExpressionContext stripGapfill(ExpressionContext expression) {
+ if (expression.getType() != ExpressionContext.Type.FUNCTION) {
+ return expression;
+ }
+
+ FunctionContext function = expression.getFunction();
+ String functionName = canonicalizeFunctionName(function.getFunctionName());
+ if (functionName.equals(POST_AGGREGATE_GAP_FILL) || functionName.equals(FILL)) {
+ return function.getArguments().get(0);
+ }
+ return expression;
+ }
+
+ public static boolean isPostAggregateGapfill(ExpressionContext expressionContext) {
+ if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+ return false;
+ }
+
+ return POST_AGGREGATE_GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+ }
+
+ public static boolean isPostAggregateGapfill(QueryContext queryContext) {
+ for (ExpressionContext expressionContext : queryContext.getSelectExpressions()) {
+ if (isPostAggregateGapfill(expressionContext)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public static boolean isFill(ExpressionContext expressionContext) {
+ if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+ return false;
+ }
+
+ return FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+ }
+
+ static public enum FillType {
+ FILL_DEFAULT_VALUE,
+ FILL_PREVIOUS_VALUE,
+ }
+
+ /**
+ * The default gapfill value for each column type.
+ */
+ static public Serializable getDefaultValue(DataSchema.ColumnDataType dataType) {
+ switch (dataType) {
+ // Single-value column
+ case INT:
+ case LONG:
+ case FLOAT:
+ case DOUBLE:
+ case BOOLEAN:
+ case TIMESTAMP:
+ return dataType.convertAndFormat(0);
+ case STRING:
+ case JSON:
+ case BYTES:
+ return "";
+ case INT_ARRAY:
+ return new int[0];
+ case LONG_ARRAY:
+ return new long[0];
+ case FLOAT_ARRAY:
+ return new float[0];
+ case DOUBLE_ARRAY:
+ return new double[0];
+ case STRING_ARRAY:
+ case TIMESTAMP_ARRAY:
+ return new String[0];
+ case BOOLEAN_ARRAY:
+ return new boolean[0];
+ case BYTES_ARRAY:
+ return new byte[0][0];
+ default:
+ throw new IllegalStateException(String.format("Cannot provide the default value for the type: %s", dataType));
+ }
+ }
+
+ private static String canonicalizeFunctionName(String functionName) {
+ return StringUtils.remove(functionName, '_').toLowerCase();
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
new file mode 100644
index 0000000..bfd7baa
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Queries test for PostAggregationGapfill queries.
+ */
+@SuppressWarnings("rawtypes")
+public class PostAggregationGapfillQueriesTest extends BaseQueriesTest {
+ private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PostAggregationGapfillQueriesTest");
+ private static final String RAW_TABLE_NAME = "parkingData";
+ private static final String SEGMENT_NAME = "testSegment";
+ private static final Random RANDOM = new Random();
+
+ private static final int NUM_LOTS = 4;
+
+ private static final String IS_OCCUPIED_COLUMN = "isOccupied";
+ private static final String LOT_ID_COLUMN = "lotId";
+ private static final String EVENT_TIME_COLUMN = "eventTime";
+ private static final Schema SCHEMA = new Schema.SchemaBuilder()
+ .addSingleValueDimension(IS_OCCUPIED_COLUMN, DataType.BOOLEAN)
+ .addSingleValueDimension(LOT_ID_COLUMN, DataType.STRING)
+ .addSingleValueDimension(EVENT_TIME_COLUMN, DataType.LONG)
+ .setPrimaryKeyColumns(Arrays.asList(LOT_ID_COLUMN, EVENT_TIME_COLUMN))
+ .build();
+ private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+ .build();
+
+ private IndexSegment _indexSegment;
+ private List<IndexSegment> _indexSegments;
+
+ @Override
+ protected String getFilter() {
+ // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator
+ return " WHERE eventTime >= 0";
+ }
+
+ @Override
+ protected IndexSegment getIndexSegment() {
+ return _indexSegment;
+ }
+
+ @Override
+ protected List<IndexSegment> getIndexSegments() {
+ return _indexSegments;
+ }
+
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ FileUtils.deleteDirectory(INDEX_DIR);
+
+ long current = 1636286400000L; //November 7, 2021 12:00:00 PM
+ int duplicates = 16;
+ int interval = 1000 * 900; // 15 minutes
+ long start = current - duplicates * 2 * interval; //November 7, 2021 4:00:00 AM
+
+ List<GenericRow> records = new ArrayList<>(NUM_LOTS * 2);
+ for (int i = 0; i < NUM_LOTS; i++) {
+ for (int j = 0; j < duplicates; j++) {
+ if (j == 4 || j == 5 || j == 6 || j == 7 || j == 10 || j == 11) {
+ continue;
+ }
+ long parkingTime = start + interval * 2 * j + RANDOM.nextInt(interval);
+ long departingTime = j == 3 ? start + interval * (2 * j + 6) + RANDOM.nextInt(interval) : start
+ + interval * (2 * j + 1) + RANDOM.nextInt(interval);
+
+ GenericRow parkingRow = new GenericRow();
+ parkingRow.putValue(EVENT_TIME_COLUMN, parkingTime);
+ parkingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
+ parkingRow.putValue(IS_OCCUPIED_COLUMN, true);
+ records.add(parkingRow);
+
+ GenericRow departingRow = new GenericRow();
+ departingRow.putValue(EVENT_TIME_COLUMN, departingTime);
+ departingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
+ departingRow.putValue(IS_OCCUPIED_COLUMN, false);
+ records.add(departingRow);
+ }
+ }
+
+ SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+ segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+ segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+ segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+ SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+ driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+ driver.build();
+
+ ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+ _indexSegment = immutableSegment;
+ _indexSegments = Arrays.asList(immutableSegment);
+ }
+
+ @Test
+ public void datetimeconvertGapfillTest() {
+ String dataTimeConvertQuery = "SELECT "
+ + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+ ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+ Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+ String gapfillQuery = "SELECT "
+ + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ DateTimeFormatSpec dateTimeFormatter
+ = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+ DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+ BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+ ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+ Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+ List<Object[]> gapFillRows = gapFillResultTable.getRows();
+ long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000");
+ for (int i = 0; i < 32; i += 4) {
+ String firstTimeCol = (String) gapFillRows.get(i)[0];
+ long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+ Assert.assertEquals(timeStamp, start);
+ Set<String> lots = new HashSet<>();
+ lots.add((String) gapFillRows.get(i)[1]);
+ for (int j = 1; j < 4; j++) {
+ Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+ Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+ lots.add((String) gapFillRows.get(i + j)[1]);
+ }
+ start += dateTimeGranularity.granularityToMillis();
+ }
+ }
+
+ @Test
+ public void toEpochHoursGapfillTest() {
+ String dataTimeConvertQuery = "SELECT "
+ + "ToEpochHours(eventTime) AS time_col, "
+ + "lotId, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+ ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+ Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+ String gapfillQuery = "SELECT "
+ + "PostAggregateGapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+ + "'454515', '454524', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+ DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+ BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+ ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+ Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+ List<Object[]> gapFillRows = gapFillResultTable.getRows();
+ long start = dateTimeFormatter.fromFormatToMillis("454515");
+ for (int i = 0; i < 32; i += 4) {
+ Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+ long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+ Assert.assertEquals(timeStamp, start);
+ Set<String> lots = new HashSet<>();
+ lots.add((String) gapFillRows.get(i)[1]);
+ for (int j = 1; j < 4; j++) {
+ Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+ Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+ lots.add((String) gapFillRows.get(i + j)[1]);
+ }
+ start += dateTimeGranularity.granularityToMillis();
+ }
+ }
+
+ @Test
+ public void toEpochMinutesRoundedHoursGapfillTest() {
+ String dataTimeConvertQuery = "SELECT "
+ + "ToEpochMinutesRounded(eventTime, 60) AS time_col, "
+ + "lotId, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+ ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+ Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+ String gapfillQuery = "SELECT "
+ + "PostAggregateGapFill(ToEpochMinutesRounded(eventTime, 60), '1:HOURS:EPOCH', "
+ + "'454515', '454524', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+ DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+ BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+ ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+ Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+ List<Object[]> gapFillRows = gapFillResultTable.getRows();
+ long start = dateTimeFormatter.fromFormatToMillis("454515");
+ for (int i = 0; i < 32; i += 4) {
+ Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+ long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+ Assert.assertEquals(timeStamp, start);
+ Set<String> lots = new HashSet<>();
+ lots.add((String) gapFillRows.get(i)[1]);
+ for (int j = 1; j < 4; j++) {
+ Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+ Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+ lots.add((String) gapFillRows.get(i + j)[1]);
+ }
+ start += dateTimeGranularity.granularityToMillis();
+ }
+ }
+
+ @Test
+ public void toEpochMinutesBucketHoursGapfillTest() {
+ String dataTimeConvertQuery = "SELECT "
+ + "ToEpochMinutesBucket(eventTime, 60) AS time_col, "
+ + "lotId, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+ ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+ Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+ String gapfillQuery = "SELECT "
+ + "PostAggregateGapFill(ToEpochMinutesBucket(eventTime, 60), '1:HOURS:EPOCH', "
+ + "'454515', '454524', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+ DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+ BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+ ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+ Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+ List<Object[]> gapFillRows = gapFillResultTable.getRows();
+ long start = dateTimeFormatter.fromFormatToMillis("454515");
+ for (int i = 0; i < 32; i += 4) {
+ Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+ long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+ Assert.assertEquals(timeStamp, start);
+ Set<String> lots = new HashSet<>();
+ lots.add((String) gapFillRows.get(i)[1]);
+ for (int j = 1; j < 4; j++) {
+ Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+ Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+ lots.add((String) gapFillRows.get(i + j)[1]);
+ }
+ start += dateTimeGranularity.granularityToMillis();
+ }
+ }
+
+ @Test
+ public void dateTruncHoursGapfillTest() {
+ String dataTimeConvertQuery = "SELECT "
+ + "DATETRUNC('hour', eventTime, 'milliseconds') AS time_col, "
+ + "lotId, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+ ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+ Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+ String gapfillQuery = "SELECT "
+ + "PostAggregateGapFill(DATETRUNC('hour', eventTime, 'milliseconds'), '1:HOURS:EPOCH', "
+ + "'454515', '454524', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+ DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+ BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+ ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+ Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+ List<Object[]> gapFillRows = gapFillResultTable.getRows();
+ long start = dateTimeFormatter.fromFormatToMillis("454515");
+ for (int i = 0; i < 32; i += 4) {
+ Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+ long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+ Assert.assertEquals(timeStamp, start);
+ Set<String> lots = new HashSet<>();
+ lots.add((String) gapFillRows.get(i)[1]);
+ for (int j = 1; j < 4; j++) {
+ Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+ Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+ lots.add((String) gapFillRows.get(i + j)[1]);
+ }
+ start += dateTimeGranularity.granularityToMillis();
+ }
+ }
+
+ @Test
+ public void datetimeconvertGapfillTestWithoutTimeBucketOrdering() {
+ try {
+ String gapfillQuery = "SELECT "
+ + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "LIMIT 200";
+
+ getBrokerResponseForSqlQuery(gapfillQuery);
+ Assert.fail();
+ } catch (IllegalArgumentException e) {
+ Assert.assertEquals(e.getMessage(), "PostAggregateGapFill does not work if the time bucket is not ordered.");
+ }
+ }
+
+ @Test
+ public void datetimeconvertGapfillTestWithHavingClause() {
+ String dataTimeConvertQueryWithUnoccupied = "SELECT "
+ + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "HAVING status = 'false' "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ BrokerResponseNative dateTimeConvertBrokerResponseWithUnoccupied
+ = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithUnoccupied);
+
+ ResultTable dateTimeConvertResultTableWithUnoccupied = dateTimeConvertBrokerResponseWithUnoccupied.getResultTable();
+ Assert.assertEquals(dateTimeConvertResultTableWithUnoccupied.getRows().size(), 20);
+
+ String dataTimeConvertQueryWithOccupied = "SELECT "
+ + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "HAVING status = 'true' "
+ + "ORDER BY 1 "
+ + "LIMIT 200";
+
+ BrokerResponseNative dateTimeConvertBrokerResponseWithOccupied
+ = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithOccupied);
+
+ ResultTable dateTimeConvertResultTableWithOccupied = dateTimeConvertBrokerResponseWithOccupied.getResultTable();
+ Assert.assertEquals(dateTimeConvertResultTableWithOccupied.getRows().size(), 4);
+
+ String gapfillQueryWithOccupied = "SELECT "
+ + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "HAVING status = 'true' "
+ + "ORDER BY 1 "
+ + "LIMIT 7";
+
+ BrokerResponseNative gapfillBrokerResponseWithOccupied = getBrokerResponseForSqlQuery(gapfillQueryWithOccupied);
+
+ ResultTable gapFillResultTableWithOccupied = gapfillBrokerResponseWithOccupied.getResultTable();
+ Assert.assertEquals(gapFillResultTableWithOccupied.getRows().size(), 7);
+
+ for (Object [] row : gapFillResultTableWithOccupied.getRows()) {
+ Assert.assertEquals(row[2], true);
+ }
+
+ String gapfillQueryWithUnoccupied = "SELECT "
+ + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "HAVING status = 'false' "
+ + "ORDER BY 1 "
+ + "LIMIT 24";
+
+ BrokerResponseNative gapfillBrokerResponseWithUnoccupied = getBrokerResponseForSqlQuery(gapfillQueryWithUnoccupied);
+
+ ResultTable gapFillResultTableWithUnoccupied = gapfillBrokerResponseWithUnoccupied.getResultTable();
+ Assert.assertEquals(gapFillResultTableWithUnoccupied.getRows().size(), 24);
+ for (Object [] row : gapFillResultTableWithUnoccupied.getRows()) {
+ Assert.assertEquals(row[2], false);
+ }
+ }
+
+
+ @Test
+ public void datetimeconvertGapfillTestTimeBucketAsLastSelection() {
+ String gapfillQuery = "SELECT "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3, "
+ + "lotId, PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 4, 5 "
+ + "ORDER BY 5 "
+ + "LIMIT 200";
+
+ DateTimeFormatSpec dateTimeFormatter
+ = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+ DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+ BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+ ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+ Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+ List<Object[]> gapFillRows = gapFillResultTable.getRows();
+ long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000");
+ for (int i = 0; i < 32; i += 4) {
+ String timeCol = (String) gapFillRows.get(i)[4];
+ long timeStamp = dateTimeFormatter.fromFormatToMillis(timeCol);
+ Assert.assertEquals(timeStamp, start);
+ Set<String> lots = new HashSet<>();
+ lots.add((String) gapFillRows.get(i)[3]);
+ for (int j = 1; j < 4; j++) {
+ Assert.assertEquals(gapFillRows.get(i)[4], gapFillRows.get(i + j)[4]);
+ Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[3]));
+ lots.add((String) gapFillRows.get(i + j)[3]);
+ }
+ start += dateTimeGranularity.granularityToMillis();
+ }
+ }
+
+ @Test
+ public void datetimeconvertGapfillWithOrderingByTwoColumnsTest() {
+ String gapfillQuery = "SELECT "
+ + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+ + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+ + "'2021-11-07 3:00:00.000', '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+ + "lotId, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+ + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+ + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+ + "FROM parkingData "
+ + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+ + "GROUP BY 1, 2 "
+ + "ORDER BY 1, 2 "
+ + "LIMIT 200";
+
+ DateTimeFormatSpec dateTimeFormatter
+ = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+ DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+ BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+ ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+ Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+ List<Object[]> gapFillRows = gapFillResultTable.getRows();
+ long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000");
+ for (int i = 0; i < 32; i += 4) {
+ String firstTimeCol = (String) gapFillRows.get(i)[0];
+ long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+ Assert.assertEquals(timeStamp, start);
+ Set<String> lots = new HashSet<>();
+ lots.add((String) gapFillRows.get(i)[1]);
+ for (int j = 1; j < 4; j++) {
+ Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+ Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+ lots.add((String) gapFillRows.get(i + j)[1]);
+ }
+ start += dateTimeGranularity.granularityToMillis();
+ }
+ }
+
+ @AfterClass
+ public void tearDown()
+ throws IOException {
+ _indexSegment.destroy();
+ FileUtils.deleteDirectory(INDEX_DIR);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org