You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/12/03 01:01:11 UTC

[pinot] branch master updated: Add Post-Aggregation Gapfilling functionality. (#7781)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5b9b836  Add Post-Aggregation Gapfilling functionality. (#7781)
5b9b836 is described below

commit 5b9b8364ef546113ff548b41504cbb0dc2aa2eda
Author: weixiangsun <91...@users.noreply.github.com>
AuthorDate: Thu Dec 2 17:00:51 2021 -0800

    Add Post-Aggregation Gapfilling functionality. (#7781)
---
 .../core/operator/transform/TransformOperator.java |   4 +-
 .../reduce/GapFillGroupByDataTableReducer.java     | 491 ++++++++++++++++
 .../core/query/reduce/PostAggregationHandler.java  |   4 +-
 .../core/query/reduce/ResultReducerFactory.java    |   3 +
 .../org/apache/pinot/core/util/GapfillUtils.java   | 122 ++++
 .../queries/PostAggregationGapfillQueriesTest.java | 616 +++++++++++++++++++++
 6 files changed, 1238 insertions(+), 2 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
index d3b752e..08bfeb8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/TransformOperator.java
@@ -36,6 +36,7 @@ import org.apache.pinot.core.operator.blocks.TransformBlock;
 import org.apache.pinot.core.operator.transform.function.TransformFunction;
 import org.apache.pinot.core.operator.transform.function.TransformFunctionFactory;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.datasource.DataSource;
 import org.apache.pinot.segment.spi.index.reader.Dictionary;
 
@@ -62,7 +63,8 @@ public class TransformOperator extends BaseOperator<TransformBlock> {
     _projectionOperator = projectionOperator;
     _dataSourceMap = projectionOperator.getDataSourceMap();
     for (ExpressionContext expression : expressions) {
-      TransformFunction transformFunction = TransformFunctionFactory.get(queryContext, expression, _dataSourceMap);
+      TransformFunction transformFunction =
+          TransformFunctionFactory.get(queryContext, GapfillUtils.stripGapfill(expression), _dataSourceMap);
       _transformFunctionMap.put(expression, transformFunction);
     }
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
new file mode 100644
index 0000000..9f739b0
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/GapFillGroupByDataTableReducer.java
@@ -0,0 +1,491 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerGauge;
+import org.apache.pinot.common.metrics.BrokerMeter;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.request.context.OrderByExpressionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceCallable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+/**
+ * Helper class to reduce data tables and set group by results into the BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class GapFillGroupByDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value.
+
+  private final QueryContext _queryContext;
+  private final AggregationFunction[] _aggregationFunctions;
+  private final int _numAggregationFunctions;
+  private final List<ExpressionContext> _groupByExpressions;
+  private final int _numGroupByExpressions;
+  private final int _numColumns;
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final Set<Key> _groupByKeys;
+  private final Map<Key, Object[]> _previousByGroupKey;
+  private final int _numOfGroupByKeys;
+  private final List<Integer> _groupByKeyIndexes;
+  private final boolean [] _isGroupBySelections;
+  private int _timeBucketIndex = -1;
+
+  GapFillGroupByDataTableReducer(QueryContext queryContext) {
+    Preconditions.checkArgument(
+        queryContext.getBrokerRequest().getPinotQuery() != null, "GapFill can only be applied to sql query");
+    _queryContext = queryContext;
+    _aggregationFunctions = queryContext.getAggregationFunctions();
+    assert _aggregationFunctions != null;
+    _numAggregationFunctions = _aggregationFunctions.length;
+    _groupByExpressions = queryContext.getGroupByExpressions();
+    assert _groupByExpressions != null;
+    _numGroupByExpressions = _groupByExpressions.size();
+    _numColumns = _numAggregationFunctions + _numGroupByExpressions;
+
+    ExpressionContext gapFillSelection = null;
+    for (ExpressionContext expressionContext : _queryContext.getSelectExpressions()) {
+      if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
+        gapFillSelection = expressionContext;
+        break;
+      }
+    }
+
+    List<ExpressionContext> args = gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(
+        args.size() == 5, "PostAggregateGapFill does not have correct number of arguments.");
+    Preconditions.checkArgument(
+        args.get(1).getLiteral() != null, "The second argument of PostAggregateGapFill should be TimeFormatter.");
+    Preconditions.checkArgument(
+        args.get(2).getLiteral() != null, "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(
+        args.get(3).getLiteral() != null, "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(
+        args.get(4).getLiteral() != null, "The fifth argument of PostAggregateGapFill should be time bucket size.");
+
+    boolean orderByTimeBucket = false;
+    if (_queryContext.getOrderByExpressions() != null && !_queryContext.getOrderByExpressions().isEmpty()) {
+      OrderByExpressionContext firstOrderByExpression = _queryContext.getOrderByExpressions().get(0);
+      orderByTimeBucket =
+          firstOrderByExpression.isAsc() && firstOrderByExpression.getExpression().equals(gapFillSelection);
+    }
+
+    Preconditions.checkArgument(
+        orderByTimeBucket, "PostAggregateGapFill does not work if the time bucket is not ordered.");
+
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    String end = args.get(3).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _groupByKeys = new HashSet<>();
+    _previousByGroupKey = new HashMap<>();
+    _numOfGroupByKeys = _queryContext.getGroupByExpressions().size() - 1;
+    _groupByKeyIndexes = new ArrayList<>();
+    _isGroupBySelections = new boolean[_queryContext.getSelectExpressions().size()];
+
+    for (ExpressionContext expressionContext : _groupByExpressions) {
+      if (GapfillUtils.isPostAggregateGapfill(expressionContext)) {
+        for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
+          if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
+            _timeBucketIndex = i;
+            _isGroupBySelections[i] = true;
+            break;
+          }
+        }
+      } else {
+        for (int i = 0; i < _queryContext.getSelectExpressions().size(); i++) {
+          if (expressionContext.equals(_queryContext.getSelectExpressions().get(i))) {
+            _groupByKeyIndexes.add(i);
+            _isGroupBySelections[i] = true;
+            break;
+          }
+        }
+      }
+    }
+
+    Preconditions.checkArgument(_timeBucketIndex >= 0, "There is no time bucket.");
+  }
+
+  private long truncate(long epoch) {
+    int sz = _dateTimeGranularity.getSize();
+    return epoch / sz * sz;
+  }
+
+  /**
+   * Reduces and sets group by results into ResultTable, if responseFormat = sql
+   * By default, sets group by results into GroupByResults
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    assert dataSchema != null;
+    Collection<DataTable> dataTables = dataTableMap.values();
+
+    try {
+      setSQLGroupByInResultTable(brokerResponseNative, dataSchema, dataTables, reducerContext, tableName,
+          brokerMetrics);
+    } catch (TimeoutException e) {
+      brokerResponseNative.getProcessingExceptions()
+          .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
+    }
+    int resultSize = brokerResponseNative.getResultTable().getRows().size();
+
+    if (brokerMetrics != null && resultSize > 0) {
+      brokerMetrics.addMeteredTableValue(tableName, BrokerMeter.GROUP_BY_SIZE, resultSize);
+    }
+  }
+
+  private Key constructGroupKeys(Object[] row) {
+    Object [] groupKeys = new Object[_numOfGroupByKeys];
+    for (int i = 0; i < _numOfGroupByKeys; i++) {
+      groupKeys[i] = row[_groupByKeyIndexes.get(i)];
+    }
+    return new Key(groupKeys);
+  }
+
+  /**
+   * Extract group by order by results and set into {@link ResultTable}
+   * @param brokerResponseNative broker response
+   * @param dataSchema data schema
+   * @param dataTables Collection of data tables
+   * @param reducerContext DataTableReducer context
+   * @param rawTableName table name
+   * @param brokerMetrics broker metrics (meters)
+   * @throws TimeoutException If unable complete within timeout.
+   */
+  private void setSQLGroupByInResultTable(BrokerResponseNative brokerResponseNative, DataSchema dataSchema,
+      Collection<DataTable> dataTables, DataTableReducerContext reducerContext, String rawTableName,
+      BrokerMetrics brokerMetrics)
+      throws TimeoutException {
+    IndexedTable indexedTable = getIndexedTable(dataSchema, dataTables, reducerContext);
+    if (brokerMetrics != null) {
+      brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.NUM_RESIZES, indexedTable.getNumResizes());
+      brokerMetrics.addValueToTableGauge(rawTableName, BrokerGauge.RESIZE_TIME_MS, indexedTable.getResizeTimeMs());
+    }
+    DataSchema prePostAggregationDataSchema = getPrePostAggregationDataSchema(dataSchema);
+    ColumnDataType[] columnDataTypes = prePostAggregationDataSchema.getColumnDataTypes();
+
+    PostAggregationHandler postAggregationHandler =
+        new PostAggregationHandler(_queryContext, prePostAggregationDataSchema);
+    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+    ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
+    Iterator<Record> sortedIterator = indexedTable.iterator();
+    while (sortedIterator.hasNext()) {
+      Object[] row = sortedIterator.next().getValues();
+      extractFinalAggregationResults(row);
+      for (int i = 0; i < columnDataTypes.length; i++) {
+        row[i] = columnDataTypes[i].convert(row[i]);
+      }
+      Object[] resultRow = postAggregationHandler.getResult(row);
+      for (int i = 0; i < resultColumnDataTypes.length; i++) {
+        resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+      }
+
+      _groupByKeys.add(constructGroupKeys(resultRow));
+    }
+
+    List<Object[]> gapfillResultRows = gapFill(indexedTable.iterator(), postAggregationHandler);
+    brokerResponseNative.setResultTable(new ResultTable(resultDataSchema, gapfillResultRows));
+  }
+
+  List<Object[]> gapFill(Iterator<Record> sortedIterator, PostAggregationHandler postAggregationHandler) {
+    DataSchema resultDataSchema = postAggregationHandler.getResultDataSchema();
+    ColumnDataType[] resultColumnDataTypes = resultDataSchema.getColumnDataTypes();
+    int limit = _queryContext.getLimit();
+    int numResultColumns = resultColumnDataTypes.length;
+    List<Object[]> gapfillResultRows = new ArrayList<>(limit);
+    long step = _dateTimeGranularity.granularityToMillis();
+    FilterContext havingFilter = _queryContext.getHavingFilter();
+    HavingFilterHandler havingFilterHandler = null;
+    if (havingFilter != null) {
+      havingFilterHandler = new HavingFilterHandler(havingFilter, postAggregationHandler);
+    }
+    Record record = null;
+    for (long time = _startMs; time + 2 * step <= _endMs; time += step) {
+      Set<Key> keys = new HashSet<>(_groupByKeys);
+      if (record == null && sortedIterator.hasNext()) {
+        record = sortedIterator.next();
+      }
+
+      while (record != null) {
+        Object[] row = record.getValues();
+
+        Object[] resultRow = postAggregationHandler.getResult(row);
+        for (int i = 0; i < resultColumnDataTypes.length; i++) {
+          resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+        }
+
+        long timeCol = _dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[_timeBucketIndex]));
+        if (timeCol > time) {
+          break;
+        }
+        if (timeCol == time) {
+          if (havingFilterHandler == null || havingFilterHandler.isMatch(row)) {
+            gapfillResultRows.add(resultRow);
+            if (gapfillResultRows.size() == limit) {
+              return gapfillResultRows;
+            }
+          }
+          Key key = constructGroupKeys(resultRow);
+          keys.remove(key);
+          _previousByGroupKey.put(key, resultRow);
+        }
+        if (sortedIterator.hasNext()) {
+          record = sortedIterator.next();
+        } else {
+          record = null;
+        }
+      }
+
+      for (Key key : keys) {
+        Object[] gapfillRow = new Object[numResultColumns];
+        int keyIndex = 0;
+        for (int i = 0; i < _isGroupBySelections.length; i++) {
+          if (_isGroupBySelections[i]) {
+            if (i == _timeBucketIndex) {
+              if (resultColumnDataTypes[i] == ColumnDataType.LONG) {
+                gapfillRow[_timeBucketIndex] = Long.valueOf(_dateTimeFormatter.fromMillisToFormat(time));
+              } else {
+                gapfillRow[_timeBucketIndex] = _dateTimeFormatter.fromMillisToFormat(time);
+              }
+            } else {
+              gapfillRow[i] = key.getValues()[keyIndex++];
+            }
+          } else {
+            gapfillRow[i] = getFillValue(i, key, resultColumnDataTypes[i]);
+          }
+        }
+
+        if (havingFilterHandler == null || havingFilterHandler.isMatch(gapfillRow)) {
+          gapfillResultRows.add(gapfillRow);
+          if (gapfillResultRows.size() == limit) {
+            return gapfillResultRows;
+          }
+        }
+      }
+    }
+    return gapfillResultRows;
+  }
+
+  Object getFillValue(int columIndex, Object key, ColumnDataType dataType) {
+    ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(columIndex);
+    if (expressionContext.getFunction() != null && GapfillUtils.isFill(expressionContext)) {
+      List<ExpressionContext> args = expressionContext.getFunction().getArguments();
+      if (args.get(1).getLiteral() == null) {
+        throw new UnsupportedOperationException("Wrong Sql.");
+      }
+      GapfillUtils.FillType fillType = GapfillUtils.FillType.valueOf(args.get(1).getLiteral());
+      if (fillType == GapfillUtils.FillType.FILL_DEFAULT_VALUE) {
+        // TODO: may fill the default value from sql in the future.
+        return GapfillUtils.getDefaultValue(dataType);
+      } else if (fillType == GapfillUtils.FillType.FILL_PREVIOUS_VALUE) {
+        Object[] row = _previousByGroupKey.get(key);
+        if (row != null) {
+          return row[columIndex];
+        } else {
+          return GapfillUtils.getDefaultValue(dataType);
+        }
+      } else {
+        throw new UnsupportedOperationException("unsupported fill type.");
+      }
+    } else {
+      return GapfillUtils.getDefaultValue(dataType);
+    }
+  }
+
+  /**
+   * Helper method to extract the final aggregation results for the given row (in-place).
+   */
+  private void extractFinalAggregationResults(Object[] row) {
+    for (int i = 0; i < _numAggregationFunctions; i++) {
+      int valueIndex = i + _numGroupByExpressions;
+      row[valueIndex] = _aggregationFunctions[i].extractFinalResult(row[valueIndex]);
+    }
+  }
+
+  /**
+   * Constructs the DataSchema for the rows before the post-aggregation (SQL mode).
+   */
+  private DataSchema getPrePostAggregationDataSchema(DataSchema dataSchema) {
+    String[] columnNames = dataSchema.getColumnNames();
+    ColumnDataType[] columnDataTypes = new ColumnDataType[_numColumns];
+    System.arraycopy(dataSchema.getColumnDataTypes(), 0, columnDataTypes, 0, _numGroupByExpressions);
+    for (int i = 0; i < _numAggregationFunctions; i++) {
+      columnDataTypes[i + _numGroupByExpressions] = _aggregationFunctions[i].getFinalResultColumnType();
+    }
+    return new DataSchema(columnNames, columnDataTypes);
+  }
+
+  private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext)
+      throws TimeoutException {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    // In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());
+    int limit = _queryContext.getLimit();
+    // TODO: Make minTrimSize configurable
+    int trimSize = GroupByUtils.getTableCapacity(limit);
+    // NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy.
+    // TODO: Resolve the HAVING clause within the IndexedTable before returning the result
+    int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
+    int trimThreshold = reducerContext.getGroupByTrimThreshold();
+    IndexedTable indexedTable;
+    if (numReduceThreadsToUse <= 1) {
+      indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
+    } else {
+      if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
+        // special case of trim threshold where it is set to max value.
+        // there won't be any trimming during upsert in this case.
+        // thus we can avoid the overhead of read-lock and write-lock
+        // in the upsert method.
+        indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, resultSize);
+      } else {
+        indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
+      }
+    }
+
+    // Create groups of data tables that each thread can process concurrently.
+    // Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse);
+
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      reduceGroups.add(new ArrayList<>());
+    }
+    for (int i = 0; i < numDataTables; i++) {
+      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+    }
+
+    ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+    long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
+    try {
+      reducerContext.getExecutorService().invokeAll(reduceGroups.stream()
+          .map(reduceGroup -> new TraceCallable<Void>() {
+            @Override
+            public Void callJob() throws Exception {
+              for (DataTable dataTable : reduceGroup) {
+                int numRows = dataTable.getNumberOfRows();
+                for (int rowId = 0; rowId < numRows; rowId++) {
+                  Object[] values = new Object[_numColumns];
+                  for (int colId = 0; colId < _numColumns; colId++) {
+                    switch (storedColumnDataTypes[colId]) {
+                      case INT:
+                        values[colId] = dataTable.getInt(rowId, colId);
+                        break;
+                      case LONG:
+                        values[colId] = dataTable.getLong(rowId, colId);
+                        break;
+                      case FLOAT:
+                        values[colId] = dataTable.getFloat(rowId, colId);
+                        break;
+                      case DOUBLE:
+                        values[colId] = dataTable.getDouble(rowId, colId);
+                        break;
+                      case STRING:
+                        values[colId] = dataTable.getString(rowId, colId);
+                        break;
+                      case BYTES:
+                        values[colId] = dataTable.getBytes(rowId, colId);
+                        break;
+                      case OBJECT:
+                        values[colId] = dataTable.getObject(rowId, colId);
+                        break;
+                      // Add other aggregation intermediate result / group-by column type supports here
+                      default:
+                        throw new IllegalStateException();
+                    }
+                  }
+                  indexedTable.upsert(new Record(values));
+              }
+            }
+            return null;
+          }
+          }).collect(Collectors.toList()), timeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      throw new TimeoutException("Timed out in broker reduce phase.");
+    }
+
+    indexedTable.finish(true);
+    return indexedTable;
+  }
+
+  /**
+   * Computes the number of reduce threads to use per query.
+   * <ul>
+   *   <li> Use single thread if number of data tables to reduce is less than
+   *   {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
+   *   <li> Else, use min of max allowed reduce threads per query, and number of data tables.</li>
+   * </ul>
+   *
+   * @param numDataTables Number of data tables to reduce
+   * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
+   * @return Number of reduce threads to use for the query
+   */
+  private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) {
+    // Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
+    if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
+      return Math.min(1, numDataTables); // Number of data tables can be zero.
+    }
+
+    return Math.min(maxReduceThreadsPerQuery, numDataTables);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
index e190e2f..4705951 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PostAggregationHandler.java
@@ -28,6 +28,7 @@ import org.apache.pinot.common.utils.DataSchema;
 import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
 import org.apache.pinot.core.query.postaggregation.PostAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
 
 
 /**
@@ -50,7 +51,7 @@ public class PostAggregationHandler {
       _numGroupByExpressions = groupByExpressions.size();
       _groupByExpressionIndexMap = new HashMap<>();
       for (int i = 0; i < _numGroupByExpressions; i++) {
-        _groupByExpressionIndexMap.put(groupByExpressions.get(i), i);
+        _groupByExpressionIndexMap.put(GapfillUtils.stripGapfill(groupByExpressions.get(i)), i);
       }
     } else {
       _numGroupByExpressions = 0;
@@ -98,6 +99,7 @@ public class PostAggregationHandler {
    * Returns a ValueExtractor based on the given expression.
    */
   public ValueExtractor getValueExtractor(ExpressionContext expression) {
+    expression = GapfillUtils.stripGapfill(expression);
     if (expression.getType() == ExpressionContext.Type.LITERAL) {
       // Literal
       return new LiteralValueExtractor(expression.getLiteral());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
index 7dcf05d..e5e9bf8 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/ResultReducerFactory.java
@@ -22,6 +22,7 @@ import org.apache.pinot.common.request.BrokerRequest;
 import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.DistinctAggregationFunction;
 import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.util.GapfillUtils;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 
 
@@ -56,6 +57,8 @@ public final class ResultReducerFactory {
         } else {
           return new AggregationDataTableReducer(queryContext);
         }
+      } else if (GapfillUtils.isPostAggregateGapfill(queryContext)) {
+        return new GapFillGroupByDataTableReducer(queryContext);
       } else {
         // Aggregation group-by query
         return new GroupByDataTableReducer(queryContext);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
new file mode 100644
index 0000000..55b618c
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/util/GapfillUtils.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.util;
+
+import java.io.Serializable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.request.context.QueryContext;
+
+
+/**
+ * Util class to encapsulate all utilites required for gapfill.
+ */
+public class GapfillUtils {
+  private static final String POST_AGGREGATE_GAP_FILL = "postaggregategapfill";
+  private static final String FILL = "fill";
+
+  private GapfillUtils() {
+  }
+
+  public static ExpressionContext stripGapfill(ExpressionContext expression) {
+    if (expression.getType() != ExpressionContext.Type.FUNCTION) {
+      return expression;
+    }
+
+    FunctionContext function = expression.getFunction();
+    String functionName = canonicalizeFunctionName(function.getFunctionName());
+    if (functionName.equals(POST_AGGREGATE_GAP_FILL) || functionName.equals(FILL)) {
+      return function.getArguments().get(0);
+    }
+    return expression;
+  }
+
+  public static boolean isPostAggregateGapfill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return POST_AGGREGATE_GAP_FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  public static boolean isPostAggregateGapfill(QueryContext queryContext) {
+    for (ExpressionContext expressionContext : queryContext.getSelectExpressions()) {
+      if (isPostAggregateGapfill(expressionContext)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  public static boolean isFill(ExpressionContext expressionContext) {
+    if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+      return false;
+    }
+
+    return FILL.equals(canonicalizeFunctionName(expressionContext.getFunction().getFunctionName()));
+  }
+
+  static public enum FillType {
+    FILL_DEFAULT_VALUE,
+    FILL_PREVIOUS_VALUE,
+  }
+
+  /**
+   * The default gapfill value for each column type.
+   */
+  static public Serializable getDefaultValue(DataSchema.ColumnDataType dataType) {
+    switch (dataType) {
+      // Single-value column
+      case INT:
+      case LONG:
+      case FLOAT:
+      case DOUBLE:
+      case BOOLEAN:
+      case TIMESTAMP:
+        return dataType.convertAndFormat(0);
+      case STRING:
+      case JSON:
+      case BYTES:
+        return "";
+      case INT_ARRAY:
+        return new int[0];
+      case LONG_ARRAY:
+        return new long[0];
+      case FLOAT_ARRAY:
+        return new float[0];
+      case DOUBLE_ARRAY:
+        return new double[0];
+      case STRING_ARRAY:
+      case TIMESTAMP_ARRAY:
+        return new String[0];
+      case BOOLEAN_ARRAY:
+        return new boolean[0];
+      case BYTES_ARRAY:
+        return new byte[0][0];
+      default:
+        throw new IllegalStateException(String.format("Cannot provide the default value for the type: %s", dataType));
+    }
+  }
+
+  private static String canonicalizeFunctionName(String functionName) {
+    return StringUtils.remove(functionName, '_').toLowerCase();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
new file mode 100644
index 0000000..bfd7baa
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/queries/PostAggregationGapfillQueriesTest.java
@@ -0,0 +1,616 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Random;
+import java.util.Set;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Queries test for PostAggregationGapfill queries.
+ */
+@SuppressWarnings("rawtypes")
+public class PostAggregationGapfillQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PostAggregationGapfillQueriesTest");
+  private static final String RAW_TABLE_NAME = "parkingData";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final Random RANDOM = new Random();
+
+  private static final int NUM_LOTS = 4;
+
+  private static final String IS_OCCUPIED_COLUMN = "isOccupied";
+  private static final String LOT_ID_COLUMN = "lotId";
+  private static final String EVENT_TIME_COLUMN = "eventTime";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(IS_OCCUPIED_COLUMN, DataType.BOOLEAN)
+      .addSingleValueDimension(LOT_ID_COLUMN, DataType.STRING)
+      .addSingleValueDimension(EVENT_TIME_COLUMN, DataType.LONG)
+      .setPrimaryKeyColumns(Arrays.asList(LOT_ID_COLUMN, EVENT_TIME_COLUMN))
+      .build();
+  private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator
+    return " WHERE eventTime >= 0";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    long current = 1636286400000L; //November 7, 2021 12:00:00 PM
+    int duplicates = 16;
+    int interval = 1000 * 900; // 15 minutes
+    long start = current - duplicates * 2 * interval; //November 7, 2021 4:00:00 AM
+
+    List<GenericRow> records = new ArrayList<>(NUM_LOTS * 2);
+    for (int i = 0; i < NUM_LOTS; i++) {
+      for (int j = 0; j < duplicates; j++) {
+        if (j == 4 || j == 5 || j == 6 || j == 7 || j == 10 || j == 11) {
+          continue;
+        }
+        long parkingTime = start + interval * 2 * j + RANDOM.nextInt(interval);
+        long departingTime = j == 3 ? start + interval * (2 * j + 6) + RANDOM.nextInt(interval) : start
+            + interval * (2 * j + 1) + RANDOM.nextInt(interval);
+
+        GenericRow parkingRow = new GenericRow();
+        parkingRow.putValue(EVENT_TIME_COLUMN, parkingTime);
+        parkingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
+        parkingRow.putValue(IS_OCCUPIED_COLUMN, true);
+        records.add(parkingRow);
+
+        GenericRow departingRow = new GenericRow();
+        departingRow.putValue(EVENT_TIME_COLUMN, departingTime);
+        departingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(i));
+        departingRow.putValue(IS_OCCUPIED_COLUMN, false);
+        records.add(departingRow);
+      }
+    }
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment);
+  }
+
+  @Test
+  public void datetimeconvertGapfillTest() {
+    String dataTimeConvertQuery = "SELECT "
+        + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+    ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+    String gapfillQuery = "SELECT "
+        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+    List<Object[]> gapFillRows = gapFillResultTable.getRows();
+    long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000");
+    for (int i = 0; i < 32; i += 4) {
+      String firstTimeCol = (String) gapFillRows.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Set<String> lots = new HashSet<>();
+      lots.add((String) gapFillRows.get(i)[1]);
+      for (int j = 1; j < 4; j++) {
+        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+        lots.add((String) gapFillRows.get(i + j)[1]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochHoursGapfillTest() {
+    String dataTimeConvertQuery = "SELECT "
+        + "ToEpochHours(eventTime) AS time_col, "
+        + "lotId, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+    ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+    String gapfillQuery = "SELECT "
+        + "PostAggregateGapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "'454515',  '454524', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+    List<Object[]> gapFillRows = gapFillResultTable.getRows();
+    long start = dateTimeFormatter.fromFormatToMillis("454515");
+    for (int i = 0; i < 32; i += 4) {
+      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+      Assert.assertEquals(timeStamp, start);
+      Set<String> lots = new HashSet<>();
+      lots.add((String) gapFillRows.get(i)[1]);
+      for (int j = 1; j < 4; j++) {
+        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+        lots.add((String) gapFillRows.get(i + j)[1]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTest() {
+    String dataTimeConvertQuery = "SELECT "
+        + "ToEpochMinutesRounded(eventTime, 60) AS time_col, "
+        + "lotId, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+    ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+    String gapfillQuery = "SELECT "
+        + "PostAggregateGapFill(ToEpochMinutesRounded(eventTime, 60), '1:HOURS:EPOCH', "
+        + "'454515',  '454524', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+    List<Object[]> gapFillRows = gapFillResultTable.getRows();
+    long start = dateTimeFormatter.fromFormatToMillis("454515");
+    for (int i = 0; i < 32; i += 4) {
+      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+      Assert.assertEquals(timeStamp, start);
+      Set<String> lots = new HashSet<>();
+      lots.add((String) gapFillRows.get(i)[1]);
+      for (int j = 1; j < 4; j++) {
+        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+        lots.add((String) gapFillRows.get(i + j)[1]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochMinutesBucketHoursGapfillTest() {
+    String dataTimeConvertQuery = "SELECT "
+        + "ToEpochMinutesBucket(eventTime, 60) AS time_col, "
+        + "lotId, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+    ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+    String gapfillQuery = "SELECT "
+        + "PostAggregateGapFill(ToEpochMinutesBucket(eventTime, 60), '1:HOURS:EPOCH', "
+        + "'454515',  '454524', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+    List<Object[]> gapFillRows = gapFillResultTable.getRows();
+    long start = dateTimeFormatter.fromFormatToMillis("454515");
+    for (int i = 0; i < 32; i += 4) {
+      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+      Assert.assertEquals(timeStamp, start);
+      Set<String> lots = new HashSet<>();
+      lots.add((String) gapFillRows.get(i)[1]);
+      for (int j = 1; j < 4; j++) {
+        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+        lots.add((String) gapFillRows.get(i + j)[1]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void dateTruncHoursGapfillTest() {
+    String dataTimeConvertQuery = "SELECT "
+        + "DATETRUNC('hour', eventTime, 'milliseconds') AS time_col, "
+        + "lotId, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN')"
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+    ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 24);
+
+    String gapfillQuery = "SELECT "
+        + "PostAggregateGapFill(DATETRUNC('hour', eventTime, 'milliseconds'), '1:HOURS:EPOCH', "
+        + "'454515',  '454524', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+    List<Object[]> gapFillRows = gapFillResultTable.getRows();
+    long start = dateTimeFormatter.fromFormatToMillis("454515");
+    for (int i = 0; i < 32; i += 4) {
+      Long firstTimeCol = (Long) gapFillRows.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol.toString());
+      Assert.assertEquals(timeStamp, start);
+      Set<String> lots = new HashSet<>();
+      lots.add((String) gapFillRows.get(i)[1]);
+      for (int j = 1; j < 4; j++) {
+        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+        lots.add((String) gapFillRows.get(i + j)[1]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestWithoutTimeBucketOrdering() {
+    try {
+      String gapfillQuery = "SELECT "
+          + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+          + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+          + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+          + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+          + "lotId, "
+          + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+          + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+          + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+          + "FROM parkingData "
+          + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+          + "GROUP BY 1, 2 "
+          + "LIMIT 200";
+
+      getBrokerResponseForSqlQuery(gapfillQuery);
+      Assert.fail();
+    } catch (IllegalArgumentException e) {
+      Assert.assertEquals(e.getMessage(), "PostAggregateGapFill does not work if the time bucket is not ordered.");
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestWithHavingClause() {
+    String dataTimeConvertQueryWithUnoccupied = "SELECT "
+        + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "HAVING status = 'false' "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponseWithUnoccupied
+        = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithUnoccupied);
+
+    ResultTable dateTimeConvertResultTableWithUnoccupied = dateTimeConvertBrokerResponseWithUnoccupied.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTableWithUnoccupied.getRows().size(), 20);
+
+    String dataTimeConvertQueryWithOccupied = "SELECT "
+        + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "HAVING status = 'true' "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponseWithOccupied
+        = getBrokerResponseForSqlQuery(dataTimeConvertQueryWithOccupied);
+
+    ResultTable dateTimeConvertResultTableWithOccupied = dateTimeConvertBrokerResponseWithOccupied.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTableWithOccupied.getRows().size(), 4);
+
+    String gapfillQueryWithOccupied = "SELECT "
+        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "HAVING status = 'true' "
+        + "ORDER BY 1 "
+        + "LIMIT 7";
+
+    BrokerResponseNative gapfillBrokerResponseWithOccupied = getBrokerResponseForSqlQuery(gapfillQueryWithOccupied);
+
+    ResultTable gapFillResultTableWithOccupied = gapfillBrokerResponseWithOccupied.getResultTable();
+    Assert.assertEquals(gapFillResultTableWithOccupied.getRows().size(), 7);
+
+    for (Object [] row : gapFillResultTableWithOccupied.getRows()) {
+      Assert.assertEquals(row[2], true);
+    }
+
+    String gapfillQueryWithUnoccupied = "SELECT "
+        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "HAVING status = 'false' "
+        + "ORDER BY 1 "
+        + "LIMIT 24";
+
+    BrokerResponseNative gapfillBrokerResponseWithUnoccupied = getBrokerResponseForSqlQuery(gapfillQueryWithUnoccupied);
+
+    ResultTable gapFillResultTableWithUnoccupied = gapfillBrokerResponseWithUnoccupied.getResultTable();
+    Assert.assertEquals(gapFillResultTableWithUnoccupied.getRows().size(), 24);
+    for (Object [] row : gapFillResultTableWithUnoccupied.getRows()) {
+      Assert.assertEquals(row[2], false);
+    }
+  }
+
+
+  @Test
+  public void datetimeconvertGapfillTestTimeBucketAsLastSelection() {
+    String gapfillQuery = "SELECT "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3, "
+        + "lotId, PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS time_col "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 4, 5 "
+        + "ORDER BY 5 "
+        + "LIMIT 200";
+
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+    List<Object[]> gapFillRows = gapFillResultTable.getRows();
+    long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000");
+    for (int i = 0; i < 32; i += 4) {
+      String timeCol = (String) gapFillRows.get(i)[4];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(timeCol);
+      Assert.assertEquals(timeStamp, start);
+      Set<String> lots = new HashSet<>();
+      lots.add((String) gapFillRows.get(i)[3]);
+      for (int j = 1; j < 4; j++) {
+        Assert.assertEquals(gapFillRows.get(i)[4], gapFillRows.get(i + j)[4]);
+        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[3]));
+        lots.add((String) gapFillRows.get(i + j)[3]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillWithOrderingByTwoColumnsTest() {
+    String gapfillQuery = "SELECT "
+        + "PostAggregateGapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "'2021-11-07 3:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS') AS time_col, "
+        + "lotId, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_PREVIOUS_VALUE') as status1, "
+        + "FILL(lastWithTime(isOccupied, eventTime, 'BOOLEAN'), 'FILL_DEFAULT_VALUE') as status2, "
+        + "lastWithTime(isOccupied, eventTime, 'BOOLEAN') as status3 "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1635940800000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1, 2 "
+        + "ORDER BY 1, 2 "
+        + "LIMIT 200";
+
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    BrokerResponseNative gapfillBrokerResponse = getBrokerResponseForSqlQuery(gapfillQuery);
+
+    ResultTable gapFillResultTable = gapfillBrokerResponse.getResultTable();
+    Assert.assertEquals(gapFillResultTable.getRows().size(), 32);
+    List<Object[]> gapFillRows = gapFillResultTable.getRows();
+    long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 03:00:00.000");
+    for (int i = 0; i < 32; i += 4) {
+      String firstTimeCol = (String) gapFillRows.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Set<String> lots = new HashSet<>();
+      lots.add((String) gapFillRows.get(i)[1]);
+      for (int j = 1; j < 4; j++) {
+        Assert.assertEquals(gapFillRows.get(i)[0], gapFillRows.get(i + j)[0]);
+        Assert.assertFalse(lots.contains(gapFillRows.get(i + j)[1]));
+        lots.add((String) gapFillRows.get(i + j)[1]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org