You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/02/22 01:02:08 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8029: Add Pre-Aggregation Gapfilling functionality.

Jackie-Jiang commented on a change in pull request #8029:
URL: https://github.com/apache/pinot/pull/8029#discussion_r811396986



##########
File path: pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
##########
@@ -129,8 +129,25 @@ public static PinotQuery compileToPinotQuery(String sql)
     if (!options.isEmpty()) {
       sql = removeOptionsFromSql(sql);
     }
+
+    SqlParser sqlParser = SqlParser.create(sql, PARSER_CONFIG);
+    SqlNode sqlNode;
+    try {
+      sqlNode = sqlParser.parseQuery();
+    } catch (SqlParseException e) {
+      throw new SqlCompilationException("Caught exception while parsing query: " + sql, e);
+    }
+
     // Compile Sql without OPTION statements.
-    PinotQuery pinotQuery = compileCalciteSqlToPinotQuery(sql);
+    PinotQuery pinotQuery = compileSqlNodeToPinotQuery(sqlNode);
+
+    SqlSelect sqlSelect = getSelectNode(sqlNode);
+    if (sqlSelect != null) {
+      SqlNode fromNode = sqlSelect.getFrom();
+      if (fromNode != null && (fromNode instanceof SqlSelect || fromNode instanceof SqlOrderBy)) {
+        pinotQuery.getDataSource().setSubquery(compileSqlNodeToPinotQuery(fromNode));
+      }
+    }

Review comment:
       Do we still need this part? I think this is already handled recursively within `compileSqlNodeToPinotQuery()`

##########
File path: pinot-common/src/main/java/org/apache/pinot/sql/parsers/CalciteSqlParser.java
##########
@@ -325,32 +342,30 @@ private static void setOptions(PinotQuery pinotQuery, List<String> optionsStatem
     pinotQuery.setQueryOptions(options);
   }
 
-  private static PinotQuery compileCalciteSqlToPinotQuery(String sql) {
-    SqlParser sqlParser = SqlParser.create(sql, PARSER_CONFIG);
-    SqlNode sqlNode;
-    try {
-      sqlNode = sqlParser.parseQuery();
-    } catch (SqlParseException e) {
-      throw new SqlCompilationException("Caught exception while parsing query: " + sql, e);
-    }
-
-    PinotQuery pinotQuery = new PinotQuery();
-    if (sqlNode instanceof SqlExplain) {
-      // Extract sql node for the query
-      sqlNode = ((SqlExplain) sqlNode).getExplicandum();
-      pinotQuery.setExplain(true);
-    }
-    SqlSelect selectNode;
+  private static SqlSelect getSelectNode(SqlNode sqlNode) {
+    SqlSelect selectNode = null;
     if (sqlNode instanceof SqlOrderBy) {
       // Store order-by info into the select sql node
       SqlOrderBy orderByNode = (SqlOrderBy) sqlNode;
       selectNode = (SqlSelect) orderByNode.query;
       selectNode.setOrderBy(orderByNode.orderList);
       selectNode.setFetch(orderByNode.fetch);
       selectNode.setOffset(orderByNode.offset);
-    } else {
+    } else if (sqlNode instanceof SqlSelect) {

Review comment:
       We probably want a `Precondition.checkArgument()` instead of the if check. If `sqlNode` is neither `SqlOrderBy` nor `SqlSelect`, we will get `NPE` later

##########
File path: pinot-core/src/test/java/org/apache/pinot/queries/PreAggregationGapfillQueriesTest.java
##########
@@ -0,0 +1,3277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+/**
+ * Queries test for Gapfill queries.
+ */
+// TODO: Item 1. table alias for subquery in next PR
+// TODO: Item 2. Deprecate PostAggregateGapfill implementation in next PR
+@SuppressWarnings("rawtypes")
+public class PreAggregationGapfillQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), "PostAggregationGapfillQueriesTest");
+  private static final String RAW_TABLE_NAME = "parkingData";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final int NUM_LOTS = 4;
+
+  private static final String IS_OCCUPIED_COLUMN = "isOccupied";
+  private static final String LEVEL_ID_COLUMN = "levelId";
+  private static final String LOT_ID_COLUMN = "lotId";
+  private static final String EVENT_TIME_COLUMN = "eventTime";
+  private static final Schema SCHEMA = new Schema.SchemaBuilder()
+      .addSingleValueDimension(IS_OCCUPIED_COLUMN, DataType.INT)
+      .addSingleValueDimension(LOT_ID_COLUMN, DataType.STRING)
+      .addSingleValueDimension(LEVEL_ID_COLUMN, DataType.STRING)
+      .addSingleValueDimension(EVENT_TIME_COLUMN, DataType.LONG)
+      .setPrimaryKeyColumns(Arrays.asList(LOT_ID_COLUMN, EVENT_TIME_COLUMN))
+      .build();
+  private static final TableConfig TABLE_CONFIG = new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME)
+      .build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    // NOTE: Use a match all filter to switch between DictionaryBasedAggregationOperator and AggregationOperator
+    return " WHERE eventTime >= 0";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  GenericRow createRow(String time, int levelId, int lotId, boolean isOccupied) {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    GenericRow parkingRow = new GenericRow();
+    parkingRow.putValue(EVENT_TIME_COLUMN, dateTimeFormatter.fromFormatToMillis(time));
+    parkingRow.putValue(LEVEL_ID_COLUMN, "Level_" + String.valueOf(levelId));
+    parkingRow.putValue(LOT_ID_COLUMN, "LotId_" + String.valueOf(lotId));
+    parkingRow.putValue(IS_OCCUPIED_COLUMN, isOccupied);
+    return parkingRow;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+
+    List<GenericRow> records = new ArrayList<>(NUM_LOTS * 2);
+    records.add(createRow("2021-11-07 04:11:00.000", 0, 0, true));
+    records.add(createRow("2021-11-07 04:21:00.000", 0, 0, true));
+    records.add(createRow("2021-11-07 04:31:00.000", 1, 0, true));
+    records.add(createRow("2021-11-07 05:17:00.000", 0, 1, true));
+    records.add(createRow("2021-11-07 05:37:00.000", 0, 1, true));
+    records.add(createRow("2021-11-07 05:47:00.000", 1, 2, true));
+    records.add(createRow("2021-11-07 06:25:00.000", 0, 2, true));
+    records.add(createRow("2021-11-07 06:35:00.000", 0, 2, true));
+    records.add(createRow("2021-11-07 06:36:00.000", 1, 1, true));
+    records.add(createRow("2021-11-07 07:44:00.000", 0, 3, true));
+    records.add(createRow("2021-11-07 07:46:00.000", 1, 3, true));
+    records.add(createRow("2021-11-07 07:54:00.000", 1, 3, true));
+    records.add(createRow("2021-11-07 08:44:00.000", 0, 2, false));
+    records.add(createRow("2021-11-07 08:44:00.000", 1, 2, false));
+    records.add(createRow("2021-11-07 09:31:00.000", 0, 3, false));
+    records.add(createRow("2021-11-07 09:31:00.000", 1, 3, false));
+    records.add(createRow("2021-11-07 10:17:00.000", 0, 0, false));
+    records.add(createRow("2021-11-07 10:33:00.000", 0, 0, false));
+    records.add(createRow("2021-11-07 10:33:00.000", 1, 0, false));
+    records.add(createRow("2021-11-07 11:54:00.000", 0, 1, false));
+    records.add(createRow("2021-11-07 11:57:00.000", 1, 1, false));
+
+    SegmentGeneratorConfig segmentGeneratorConfig = new SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, new GenericRowRecordReader(records));
+    driver.build();
+
+    ImmutableSegment immutableSegment = ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), ReadMode.mmap);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment);
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestSelectSelect() {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "  GapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     levelId, lotId, isOccupied "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    int [][] expectedOccupiedSlotsCounts1 =
+        new int [][] {{6, 6}, {8, 4}, {10, 2}, {12, 0}, {6, 4}, {4, 6}, {2, 10}, {0, 10}};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    int index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      int ones = expectedOccupiedSlotsCounts1[i][0];
+      int zeros = expectedOccupiedSlotsCounts1[i][1];
+      int total = ones + zeros;
+      for (int k = 0; k < total; k++) {
+        String firstTimeCol = (String) gapFillRows1.get(index)[0];
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        if (gapFillRows1.get(index)[3].equals(1)) {
+          ones--;
+        } else {
+          zeros--;
+        }
+        index++;
+      }
+      Assert.assertEquals(ones, 0);
+      Assert.assertEquals(zeros, 0);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows1.size(), index);
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, levelId, lotId, isOccupied, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    int [] expectedOccupiedSlotsCounts2 = new int [] {6, 8, 10, 12, 6, 4, 2, 0};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      for (int k = 0; k < expectedOccupiedSlotsCounts2[i]; k++) {
+        String firstTimeCol = (String) gapFillRows2.get(index)[0];
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        Assert.assertEquals(gapFillRows2.get(index)[3], 1);
+        index++;
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows2.size(), index);
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestAggregateSelect() {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "GapFill(time_col, "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "    FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)), levelId, lotId, occupied "
+        + "FROM ("
+        + "  SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  GROUP BY time_col, levelId, lotId "
+        + "  LIMIT 200 "
+        + ") "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    int [][] expectedOccupiedSlotsCounts1 =
+        new int [][] {{2, 6}, {4, 4}, {6, 2}, {8, 0}, {6, 2}, {4, 4}, {2, 6}, {0, 8}};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    int index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      int ones = expectedOccupiedSlotsCounts1[i][0];
+      int zeros = expectedOccupiedSlotsCounts1[i][1];
+      int total = ones + zeros;
+      for (int k = 0; k < total; k++) {
+        String firstTimeCol = (String) gapFillRows1.get(index)[0];
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        if (gapFillRows1.get(index)[3].equals(1)) {
+          ones--;
+        } else {
+          zeros--;
+        }
+        index++;
+      }
+      Assert.assertEquals(ones, 0);
+      Assert.assertEquals(zeros, 0);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows1.size(), index);
+
+    String gapfillQuery2 = "SELECT "
+        + "GapFill(time_col, "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "    FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)), levelId, lotId, occupied "
+        + "FROM ("
+        + "  SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  GROUP BY time_col, levelId, lotId "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    int [] expectedOccupiedSlotsCounts2 = new int [] {2, 4, 6, 8, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      for (int k = 0; k < expectedOccupiedSlotsCounts2[i]; k++) {
+        String firstTimeCol = (String) gapFillRows2.get(index)[0];
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        Assert.assertEquals(gapFillRows2.get(index)[3], 1);
+        index++;
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows2.size(), index);
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestGapfillAggregate() {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String dataTimeConvertQuery = "SELECT "
+        + "DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "'1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col, "
+        + "SUM(isOccupied) "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "GROUP BY 1 "
+        + "ORDER BY 1 "
+        + "LIMIT 200";
+
+    BrokerResponseNative dateTimeConvertBrokerResponse = getBrokerResponseForSqlQuery(dataTimeConvertQuery);
+
+    ResultTable dateTimeConvertResultTable = dateTimeConvertBrokerResponse.getResultTable();
+    Assert.assertEquals(dateTimeConvertResultTable.getRows().size(), 8);
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {6, 8, 10, 12, 6, 4, 2, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      String firstTimeCol = (String) gapFillRows1.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i], gapFillRows1.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {6, 8, 10, 12, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      String firstTimeCol = (String) gapFillRows2.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts2[i], gapFillRows2.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestGapfillAggregateWithOptionalGroupBy() {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCountsForLevel11 = new double [] {4, 5, 6, 5, 3, 2, 1, 0};
+    double [] expectedOccupiedSlotsCountsForLevel21 = new double [] {2, 3, 4, 7, 3, 2, 1, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCountsForLevel11.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel11.length * 2; i += 2) {
+      String firstTimeCol = (String) gapFillRows1.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i)[2]);
+      }
+      firstTimeCol = (String) gapFillRows1.get(i + 1)[0];
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCountsForLevel12 = new double [] {4, 5, 6, 5, 3, 2, 1};
+    double [] expectedOccupiedSlotsCountsForLevel22 = new double [] {2, 3, 4, 7, 3, 2, 1};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCountsForLevel12.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel12.length * 2; i += 2) {
+      String firstTimeCol = (String) gapFillRows2.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows2.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel12[i / 2], gapFillRows2.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows2.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel22[i / 2], gapFillRows2.get(i)[2]);
+      }
+      firstTimeCol = (String) gapFillRows2.get(i + 1)[0];
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows2.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel12[i / 2], gapFillRows2.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows2.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel22[i / 2], gapFillRows2.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestGapfillAggregateWithHavingClause() {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS'), "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " HAVING occupied_slots_count > 0"
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCountsForLevel11 = new double [] {4, 5, 6, 5, 3, 2, 1};
+    double [] expectedOccupiedSlotsCountsForLevel21 = new double [] {2, 3, 4, 7, 3, 2, 1};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCountsForLevel11.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel11.length * 2; i += 2) {
+      String firstTimeCol = (String) gapFillRows1.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i)[2]);
+      }
+      firstTimeCol = (String) gapFillRows1.get(i + 1)[0];
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestAggregateAggregate() {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "      '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {2, 4, 6, 8, 6, 4, 2, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      String firstTimeCol = (String) gapFillRows1.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i], gapFillRows1.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "      '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {2, 4, 6, 8, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      String firstTimeCol = (String) gapFillRows2.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts2[i], gapFillRows2.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestAggregateAggregateWithOptionalGroupBy() {
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "      '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {1, 2, 3, 4, 3, 2, 1, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i += 2) {
+      String firstTimeCol = (String) gapFillRows1.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      firstTimeCol = (String) gapFillRows1.get(i + 1)[0];
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, levelId, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "      '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {1, 2, 3, 4, 3, 2, 1};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i += 2) {
+      String firstTimeCol = (String) gapFillRows2.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      firstTimeCol = (String) gapFillRows2.get(i + 1)[0];
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void datetimeconvertGapfillTestAggregateAggregateWithHavingClause() {
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, "
+        + "    '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', "
+        + "    '2021-11-07 4:00:00.000',  '2021-11-07 12:00:00.000', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)),"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT DATETIMECONVERT(eventTime, '1:MILLISECONDS:EPOCH', "
+        + "      '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:HOURS') AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " HAVING occupied_slots_count > 0"
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {1, 2, 3, 4, 3, 2, 1};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length * 2);
+    DateTimeFormatSpec dateTimeFormatter
+        = new DateTimeFormatSpec("1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    long start = dateTimeFormatter.fromFormatToMillis("2021-11-07 04:00:00.000");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i += 2) {
+      String firstTimeCol = (String) gapFillRows1.get(i)[0];
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      firstTimeCol = (String) gapFillRows1.get(i + 1)[0];
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestSelectSelect() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "  GapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId "
+        + "FROM parkingData "
+        + "WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    int [][] expectedOccupiedSlotsCounts1 =
+        new int [][] {{6, 6}, {8, 4}, {10, 2}, {12, 0}, {6, 4}, {4, 6}, {2, 10}, {0, 10}};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    int index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      int ones = expectedOccupiedSlotsCounts1[i][0];
+      int zeros = expectedOccupiedSlotsCounts1[i][1];
+      int total = ones + zeros;
+      for (int k = 0; k < total; k++) {
+        String firstTimeCol = ((Long) (gapFillRows1.get(index)[0])).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        if (gapFillRows1.get(index)[3].equals(1)) {
+          ones--;
+        } else {
+          zeros--;
+        }
+        index++;
+      }
+      Assert.assertEquals(ones, 0);
+      Assert.assertEquals(zeros, 0);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows1.size(), index);
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, levelId, lotId, isOccupied, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    int [] expectedOccupiedSlotsCounts2 = new int [] {6, 8, 10, 12, 6, 4, 2, 0};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      for (int k = 0; k < expectedOccupiedSlotsCounts2[i]; k++) {
+        String firstTimeCol = ((Long) gapFillRows2.get(index)[0]).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        Assert.assertEquals(gapFillRows2.get(index)[3], 1);
+        index++;
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows2.size(), index);
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestAggregateSelect() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "  GapFill(time_col, '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)),"
+        + "  levelId, lotId, occupied "
+        + "FROM ("
+        + "  SELECT ToEpochHours(eventTime) AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  GROUP BY time_col, levelId, lotId "
+        + "  LIMIT 200 "
+        + ") "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    int [][] expectedOccupiedSlotsCounts1 =
+        new int [][] {{2, 6}, {4, 4}, {6, 2}, {8, 0}, {6, 2}, {4, 4}, {2, 6}, {0, 8}};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    int index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      int ones = expectedOccupiedSlotsCounts1[i][0];
+      int zeros = expectedOccupiedSlotsCounts1[i][1];
+      int total = ones + zeros;
+      for (int k = 0; k < total; k++) {
+        String firstTimeCol = ((Long) gapFillRows1.get(index)[0]).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        if (gapFillRows1.get(index)[3].equals(1)) {
+          ones--;
+        } else {
+          zeros--;
+        }
+        index++;
+      }
+      Assert.assertEquals(ones, 0);
+      Assert.assertEquals(zeros, 0);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows1.size(), index);
+
+    String gapfillQuery2 = "SELECT "
+        + "  GapFill(time_col, '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)),"
+        + " levelId, lotId, occupied "
+        + "FROM ("
+        + "  SELECT ToEpochHours(eventTime) AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  GROUP BY time_col, levelId, lotId "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    int [] expectedOccupiedSlotsCounts2 = new int [] {2, 4, 6, 8, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      for (int k = 0; k < expectedOccupiedSlotsCounts2[i]; k++) {
+        String firstTimeCol = ((Long) gapFillRows2.get(index)[0]).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        Assert.assertEquals(gapFillRows2.get(index)[3], 1);
+        index++;
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows2.size(), index);
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestGapfillAggregate() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "    isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {6, 8, 10, 12, 6, 4, 2, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i], gapFillRows1.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {6, 8, 10, 12, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows2.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts2[i], gapFillRows2.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestGapfillAggregateWithOptionalGroupBy() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCountsForLevel11 = new double [] {4, 5, 6, 5, 3, 2, 1, 0};
+    double [] expectedOccupiedSlotsCountsForLevel21 = new double [] {2, 3, 4, 7, 3, 2, 1, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCountsForLevel11.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel11.length * 2; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i)[2]);
+      }
+      firstTimeCol = ((Long) gapFillRows1.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCountsForLevel12 = new double [] {4, 5, 6, 5, 3, 2, 1};
+    double [] expectedOccupiedSlotsCountsForLevel22 = new double [] {2, 3, 4, 7, 3, 2, 1};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCountsForLevel12.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel12.length * 2; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows2.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows2.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel12[i / 2], gapFillRows2.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows2.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel22[i / 2], gapFillRows2.get(i)[2]);
+      }
+      firstTimeCol = ((Long) gapFillRows2.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows2.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel12[i / 2], gapFillRows2.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows2.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel22[i / 2], gapFillRows2.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestGapfillAggregateWithHavingClause() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(ToEpochHours(eventTime), '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "    isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " HAVING occupied_slots_count > 0"
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCountsForLevel11 = new double [] {4, 5, 6, 5, 3, 2, 1};
+    double [] expectedOccupiedSlotsCountsForLevel21 = new double [] {2, 3, 4, 7, 3, 2, 1};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCountsForLevel11.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel11.length * 2; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i)[2]);
+      }
+      firstTimeCol = ((Long) gapFillRows1.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestAggregateAggregate() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)),"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT ToEpochHours(eventTime) AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {2, 4, 6, 8, 6, 4, 2, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i], gapFillRows1.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT ToEpochHours(eventTime) AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {2, 4, 6, 8, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts2[i], gapFillRows2.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestAggregateAggregateWithOptionalGroupBy() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + " FROM ("
+        + "  SELECT ToEpochHours(eventTime) AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  GROUP BY time_col, levelId, lotId "
+        + "  LIMIT 200 "
+        + ") "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {1, 2, 3, 4, 3, 2, 1, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      firstTimeCol = ((Long) gapFillRows1.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT ToEpochHours(eventTime) AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {1, 2, 3, 4, 3, 2, 1};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows2.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      firstTimeCol = ((Long) gapFillRows2.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochHoursGapfillTestAggregateAggregateWithHavingClause() {
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT GapFill(time_col, '1:HOURS:EPOCH', "
+        + "    '454516',  '454524', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT ToEpochHours(eventTime) AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + ") "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " HAVING occupied_slots_count > 0"
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {1, 2, 3, 4, 3, 2, 1};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length * 2);
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:HOURS:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+
+    long start = dateTimeFormatter.fromFormatToMillis("454516");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      firstTimeCol = ((Long) gapFillRows1.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTestSelectSelect() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:MINUTES:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT GapFill(ToEpochMinutesRounded(eventTime, 60), '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    int [][] expectedOccupiedSlotsCounts1 =
+        new int [][] {{6, 6}, {8, 4}, {10, 2}, {12, 0}, {6, 4}, {4, 6}, {2, 10}, {0, 10}};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    int index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      int ones = expectedOccupiedSlotsCounts1[i][0];
+      int zeros = expectedOccupiedSlotsCounts1[i][1];
+      int total = ones + zeros;
+      for (int k = 0; k < total; k++) {
+        String firstTimeCol = ((Long) (gapFillRows1.get(index)[0])).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        if (gapFillRows1.get(index)[3].equals(1)) {
+          ones--;
+        } else {
+          zeros--;
+        }
+        index++;
+      }
+      Assert.assertEquals(ones, 0);
+      Assert.assertEquals(zeros, 0);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows1.size(), index);
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, levelId, lotId, isOccupied, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(ToEpochMinutesRounded(eventTime, 60), '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    int [] expectedOccupiedSlotsCounts2 = new int [] {6, 8, 10, 12, 6, 4, 2, 0};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      for (int k = 0; k < expectedOccupiedSlotsCounts2[i]; k++) {
+        String firstTimeCol = ((Long) gapFillRows2.get(index)[0]).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        Assert.assertEquals(gapFillRows2.get(index)[3], 1);
+        index++;
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows2.size(), index);
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTestAggregateSelect() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:MINUTES:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "GapFill(time_col, '1:MINUTES:EPOCH', "
+        + "   '27270960',  '27271440', '1:HOURS',"
+        + "   FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)), levelId, lotId, occupied "
+        + "FROM ("
+        + "  SELECT ToEpochMinutesRounded(eventTime, 60) AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  GROUP BY time_col, levelId, lotId "
+        + "  LIMIT 200 "
+        + ") "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    int [][] expectedOccupiedSlotsCounts1 =
+        new int [][] {{2, 6}, {4, 4}, {6, 2}, {8, 0}, {6, 2}, {4, 4}, {2, 6}, {0, 8}};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    int index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      int ones = expectedOccupiedSlotsCounts1[i][0];
+      int zeros = expectedOccupiedSlotsCounts1[i][1];
+      int total = ones + zeros;
+      for (int k = 0; k < total; k++) {
+        String firstTimeCol = ((Long) gapFillRows1.get(index)[0]).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        if (gapFillRows1.get(index)[3].equals(1)) {
+          ones--;
+        } else {
+          zeros--;
+        }
+        index++;
+      }
+      Assert.assertEquals(ones, 0);
+      Assert.assertEquals(zeros, 0);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows1.size(), index);
+
+    String gapfillQuery2 = "SELECT "
+        + "GapFill(time_col, '1:MINUTES:EPOCH', "
+        + "   '27270960',  '27271440', '1:HOURS',"
+        + "   FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)), levelId, lotId, occupied "
+        + "FROM ("
+        + "  SELECT  ToEpochMinutesRounded(eventTime, 60) AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  GROUP BY time_col, levelId, lotId "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    int [] expectedOccupiedSlotsCounts2 = new int [] {2, 4, 6, 8, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    index = 0;
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      for (int k = 0; k < expectedOccupiedSlotsCounts2[i]; k++) {
+        String firstTimeCol = ((Long) gapFillRows2.get(index)[0]).toString();
+        long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+        Assert.assertEquals(timeStamp, start);
+        Assert.assertEquals(gapFillRows2.get(index)[3], 1);
+        index++;
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+    Assert.assertEquals(gapFillRows2.size(), index);
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTestGapfillAggregate() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:MINUTES:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(ToEpochMinutesRounded(eventTime, 60), '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {6, 8, 10, 12, 6, 4, 2, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i], gapFillRows1.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(ToEpochMinutesRounded(eventTime, 60), '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {6, 8, 10, 12, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows2.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts2[i], gapFillRows2.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTestGapfillAggregateWithOptionalGroupBy() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:MINUTES:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(ToEpochMinutesRounded(eventTime, 60), '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCountsForLevel11 = new double [] {4, 5, 6, 5, 3, 2, 1, 0};
+    double [] expectedOccupiedSlotsCountsForLevel21 = new double [] {2, 3, 4, 7, 3, 2, 1, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCountsForLevel11.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel11.length * 2; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i)[2]);
+      }
+      firstTimeCol = ((Long) gapFillRows1.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(ToEpochMinutesRounded(eventTime, 60), '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE isOccupied = 1 "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCountsForLevel12 = new double [] {4, 5, 6, 5, 3, 2, 1};
+    double [] expectedOccupiedSlotsCountsForLevel22 = new double [] {2, 3, 4, 7, 3, 2, 1};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCountsForLevel12.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel12.length * 2; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows2.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows2.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel12[i / 2], gapFillRows2.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows2.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel22[i / 2], gapFillRows2.get(i)[2]);
+      }
+      firstTimeCol = ((Long) gapFillRows2.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows2.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel12[i / 2], gapFillRows2.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows2.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel22[i / 2], gapFillRows2.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTestGapfillAggregateWithHavingClause() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:MINUTES:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(isOccupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(ToEpochMinutesRounded(eventTime, 60), '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(isOccupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     isOccupied, lotId, levelId"
+        + "  FROM parkingData "
+        + "  WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " HAVING occupied_slots_count > 0"
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCountsForLevel11 = new double [] {4, 5, 6, 5, 3, 2, 1};
+    double [] expectedOccupiedSlotsCountsForLevel21 = new double [] {2, 3, 4, 7, 3, 2, 1};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCountsForLevel11.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCountsForLevel11.length * 2; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i)[2]);
+      }
+      firstTimeCol = ((Long) gapFillRows1.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      if ("Level_0".equals(gapFillRows1.get(i + 1)[1])) {
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel11[i / 2], gapFillRows1.get(i + 1)[2]);
+      } else {
+        Assert.assertEquals("Level_1", gapFillRows1.get(i + 1)[1]);
+        Assert.assertEquals(expectedOccupiedSlotsCountsForLevel21[i / 2], gapFillRows1.get(i + 1)[2]);
+      }
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTestAggregateAggregate() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:MINUTES:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(time_col, '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT ToEpochMinutesRounded(eventTime, 60) AS time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {2, 4, 6, 8, 6, 4, 2, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i], gapFillRows1.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(time_col, '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT ToEpochMinutesRounded(eventTime, 60) time_col,"
+        + "       lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " WHERE occupied = 1 "
+        + " GROUP BY time_col "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse2 = getBrokerResponseForSqlQuery(gapfillQuery2);
+
+    double [] expectedOccupiedSlotsCounts2 = new double [] {2, 4, 6, 8, 6, 4, 2};
+    ResultTable gapFillResultTable2 = gapfillBrokerResponse2.getResultTable();
+    List<Object[]> gapFillRows2 = gapFillResultTable2.getRows();
+    Assert.assertEquals(gapFillRows2.size(), expectedOccupiedSlotsCounts2.length);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCounts2.length; i++) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts2[i], gapFillRows2.get(i)[1]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+  }
+
+  @Test
+  public void toEpochMinutesRoundedHoursGapfillTestAggregateAggregateWithOptionalGroupBy() {
+    DateTimeFormatSpec dateTimeFormatter = new DateTimeFormatSpec("1:MINUTES:EPOCH");
+    DateTimeGranularitySpec dateTimeGranularity = new DateTimeGranularitySpec("1:HOURS");
+    long start;
+
+    String gapfillQuery1 = "SELECT "
+        + "time_col, levelId, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(time_col, '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("
+        + "    SELECT ToEpochMinutesRounded(eventTime, 60) AS time_col,"
+        + "     lastWithTime(isOccupied, eventTime, 'INT') as occupied, lotId, levelId"
+        + "    FROM parkingData "
+        + "    WHERE eventTime >= 1636257600000 AND eventTime <= 1636286400000 "
+        + "    GROUP BY time_col, levelId, lotId "
+        + "    LIMIT 200 "
+        + "  ) "
+        + "  LIMIT 200 "
+        + ") "
+        + " GROUP BY time_col, levelId "
+        + " LIMIT 200 ";
+
+    BrokerResponseNative gapfillBrokerResponse1 = getBrokerResponseForSqlQuery(gapfillQuery1);
+
+    double [] expectedOccupiedSlotsCounts1 = new double [] {1, 2, 3, 4, 3, 2, 1, 0};
+    ResultTable gapFillResultTable1 = gapfillBrokerResponse1.getResultTable();
+    List<Object[]> gapFillRows1 = gapFillResultTable1.getRows();
+    Assert.assertEquals(gapFillRows1.size(), expectedOccupiedSlotsCounts1.length * 2);
+    start = dateTimeFormatter.fromFormatToMillis("27270960");
+    for (int i = 0; i < expectedOccupiedSlotsCounts1.length; i += 2) {
+      String firstTimeCol = ((Long) gapFillRows1.get(i)[0]).toString();
+      long timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      firstTimeCol = ((Long) gapFillRows1.get(i + 1)[0]).toString();
+      timeStamp = dateTimeFormatter.fromFormatToMillis(firstTimeCol);
+      Assert.assertEquals(timeStamp, start);
+      Assert.assertEquals(expectedOccupiedSlotsCounts1[i / 2], gapFillRows1.get(i)[2]);
+      start += dateTimeGranularity.granularityToMillis();
+    }
+
+    String gapfillQuery2 = "SELECT "
+        + "time_col, SUM(occupied) as occupied_slots_count, time_col "
+        + "FROM ("
+        + "  SELECT  GapFill(time_col, '1:MINUTES:EPOCH', "
+        + "    '27270960',  '27271440', '1:HOURS',"
+        + "     FILL(occupied, 'FILL_PREVIOUS_VALUE'), TIMESERIESON(levelId, lotId)) AS time_col,"
+        + "     occupied, lotId, levelId"
+        + "  FROM ("

Review comment:
       Nice, so we can take arbitrary level of queries?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/GapfillSelectionPlanNode.java
##########
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.plan;
+
+import com.google.common.base.Preconditions;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.IntermediateResultsBlock;
+import org.apache.pinot.core.operator.query.SelectionOnlyOperator;
+import org.apache.pinot.core.operator.transform.TransformOperator;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.segment.spi.IndexSegment;
+
+
+/**
+ * The <code>PreAggGapFillSelectionPlanNode</code> class provides the execution
+ * plan for pre-aggregate gapfill query on a single segment.
+ */
+public class GapfillSelectionPlanNode implements PlanNode {
+  private final IndexSegment _indexSegment;
+  private final QueryContext _queryContext;
+
+  public GapfillSelectionPlanNode(IndexSegment indexSegment, QueryContext queryContext) {
+    _indexSegment = indexSegment;
+    _queryContext = queryContext;
+  }
+
+  @Override
+  public Operator<IntermediateResultsBlock> run() {
+    int limit = _queryContext.getLimit();
+
+    QueryContext queryContext = getSelectQueryContext();
+    Preconditions.checkArgument(queryContext.getOrderByExpressions() == null,
+        "The gapfill query should not have orderby expression.");

Review comment:
       Why do we have this limitation?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/filter/RowMatcher.java
##########
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce.filter;
+
+import org.apache.pinot.common.request.context.FilterContext;
+
+
+/**
+ * Filter matcher for the rows.
+ */
+public interface RowMatcher {
+  /**
+   * Returns {@code true} if the given row matches the filter, {@code false} otherwise.
+   */
+  boolean isMatch(Object[] row);
+
+  /**
+   * Helper method to construct a RowMatcher based on the given filter.
+   */
+  public static RowMatcher getRowMatcher(FilterContext filter, ValueExtractorFactory valueExtractorFactory) {

Review comment:
       Let's add a `RowMatcherFactory` and move this method there

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
##########
@@ -65,10 +66,15 @@ protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int res
     _lookupMap = lookupMap;
     _resultSize = resultSize;
 
-    List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions();
+    List<ExpressionContext> groupByExpressions;
+    if (queryContext.getGapfillType() != GapfillUtils.GapfillType.NONE) {

Review comment:
       Suggest using `null` to represent no gap-fill

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/data/table/IndexedTable.java
##########
@@ -65,10 +66,15 @@ protected IndexedTable(DataSchema dataSchema, QueryContext queryContext, int res
     _lookupMap = lookupMap;
     _resultSize = resultSize;
 
-    List<ExpressionContext> groupByExpressions = queryContext.getGroupByExpressions();
+    List<ExpressionContext> groupByExpressions;
+    if (queryContext.getGapfillType() != GapfillUtils.GapfillType.NONE) {
+      groupByExpressions = GapfillUtils.getGroupByExpressions(queryContext);
+    } else {
+      groupByExpressions = queryContext.getGroupByExpressions();
+    }
+    _aggregationFunctions = queryContext.getAggregationFunctions();

Review comment:
       (minor) Not move this as we want to keep the logic related to `groupByExpressions` together

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregateGapfillFilterHandler.java
##########
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.reduce.filter.ColumnValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.RowMatcher;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
+import org.apache.pinot.core.util.GapfillUtils;
+
+/**
+ * Handler for Filter clause of PreAggregateGapFill.
+ */
+public class PreAggregateGapfillFilterHandler implements ValueExtractorFactory {
+  private final RowMatcher _rowMatcher;
+  private final DataSchema _dataSchema;
+  private final Map<String, Integer> _indexes;
+
+  public PreAggregateGapfillFilterHandler(FilterContext filter, DataSchema dataSchema) {
+    _dataSchema = dataSchema;
+    _indexes = new HashMap<>();
+    for (int i = 0; i < _dataSchema.size(); i++) {
+      _indexes.put(_dataSchema.getColumnName(i), i);
+    }
+    _rowMatcher = RowMatcher.getRowMatcher(filter, this);
+  }
+
+  /**
+   * Returns {@code true} if the given row matches the HAVING clause, {@code false} otherwise.
+   */
+  public boolean isMatch(Object[] row) {
+    return _rowMatcher.isMatch(row);
+  }
+
+  /**
+   * Returns a ValueExtractor based on the given expression.
+   */
+  @Override
+  public ValueExtractor getValueExtractor(ExpressionContext expression) {
+    expression = GapfillUtils.stripGapfill(expression);
+    if (expression.getType() == ExpressionContext.Type.LITERAL) {
+      // Literal
+      return new LiteralValueExtractor(expression.getLiteral());
+    }
+
+    if (expression.getType() == ExpressionContext.Type.IDENTIFIER) {
+      return new ColumnValueExtractor(_indexes.get(expression.getIdentifier()), _dataSchema);
+    } else {
+      return new ColumnValueExtractor(_indexes.get(expression.getFunction().toString()), _dataSchema);

Review comment:
       This does not always work for aggregation function because the column name in data schema might not match the `FunctionContext.toString()`. You may refer to `PostAggregationHandler`, which does not rely on the column name to find the index

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/request/context/QueryContext.java
##########
@@ -85,6 +86,7 @@
   // Keep the BrokerRequest to make incremental changes
   // TODO: Remove it once the whole query engine is using the QueryContext
   private final BrokerRequest _brokerRequest;
+  private QueryContext _subqueryContext;

Review comment:
       Shall we add `_subqueryContext` and `_gapfillType` as final fields, and set their value in the builder similar to other fields?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
##########
@@ -161,8 +162,18 @@ public BaseCombineOperator run() {
       // Streaming query (only support selection only)
       return new StreamingSelectionOnlyCombineOperator(operators, _queryContext, _executorService, _streamObserver);
     }
+    GapfillUtils.GapfillType gapfillType = _queryContext.getGapfillType();
     if (QueryContextUtils.isAggregationQuery(_queryContext)) {
-      if (_queryContext.getGroupByExpressions() == null) {
+      if (gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+        _queryContext.getSubQueryContext().getSubQueryContext().setEndTimeMs(_queryContext.getEndTimeMs());
+        return new GroupByOrderByCombineOperator(

Review comment:
       (code format) Can you please apply the latest [Pinot Style](https://docs.pinot.apache.org/developers/developers-and-contributors/code-setup#intellij) and reformat the changed files?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/plan/CombinePlanNode.java
##########
@@ -180,6 +191,8 @@ public BaseCombineOperator run() {
         // Selection order-by
         return new SelectionOrderByCombineOperator(operators, _queryContext, _executorService);
       }
+    } else if (gapfillType != GapfillUtils.GapfillType.NONE) {
+        return new SelectionOnlyCombineOperator(operators, _queryContext, _executorService);

Review comment:
       Is this correct? Should we construct the operator based on whether the subquery has order-by?

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregateGapfillFilterHandler.java
##########
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FilterContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.reduce.filter.ColumnValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.LiteralValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.RowMatcher;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractor;
+import org.apache.pinot.core.query.reduce.filter.ValueExtractorFactory;
+import org.apache.pinot.core.util.GapfillUtils;
+
+/**
+ * Handler for Filter clause of PreAggregateGapFill.
+ */
+public class PreAggregateGapfillFilterHandler implements ValueExtractorFactory {

Review comment:
       Since we are going to deprecate `PostAggregateGapfill` and only keep this, let's rename it to `GapfillFilterHandler`? Same for other classes with `PreAggregate`

##########
File path: pinot-core/src/main/java/org/apache/pinot/core/query/reduce/PreAggregationGapFillDataTableReducer.java
##########
@@ -0,0 +1,775 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.reduce;
+
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import org.apache.pinot.common.exception.QueryException;
+import org.apache.pinot.common.metrics.BrokerMetrics;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.FunctionContext;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.response.broker.QueryProcessingException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.common.utils.DataSchema.ColumnDataType;
+import org.apache.pinot.common.utils.DataTable;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.data.table.ConcurrentIndexedTable;
+import org.apache.pinot.core.data.table.IndexedTable;
+import org.apache.pinot.core.data.table.Key;
+import org.apache.pinot.core.data.table.Record;
+import org.apache.pinot.core.data.table.SimpleIndexedTable;
+import org.apache.pinot.core.data.table.UnboundedConcurrentIndexedTable;
+import org.apache.pinot.core.operator.combine.GroupByOrderByCombineOperator;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunctionFactory;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.request.context.QueryContext;
+import org.apache.pinot.core.query.selection.SelectionOperatorUtils;
+import org.apache.pinot.core.transport.ServerRoutingInstance;
+import org.apache.pinot.core.util.GapfillUtils;
+import org.apache.pinot.core.util.GroupByUtils;
+import org.apache.pinot.core.util.trace.TraceRunnable;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+/**
+ * Helper class to reduce and set Aggregation results into the BrokerResponseNative
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class PreAggregationGapFillDataTableReducer implements DataTableReducer {
+  private static final int MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE = 2; // TBD, find a better value.
+
+  private final QueryContext _queryContext;
+
+  private final int _limitForAggregatedResult;
+  private int _limitForGapfilledResult;
+
+  private final DateTimeGranularitySpec _dateTimeGranularity;
+  private final DateTimeFormatSpec _dateTimeFormatter;
+  private final long _startMs;
+  private final long _endMs;
+  private final long _timeBucketSize;
+
+  private final List<Integer> _groupByKeyIndexes;
+  private boolean [] _isGroupBySelections;
+  private final Set<Key> _groupByKeys;
+  private final Map<Key, Object[]> _previousByGroupKey;
+  private final Map<String, ExpressionContext> _fillExpressions;
+  private final List<ExpressionContext> _timeSeries;
+  private final GapfillUtils.GapfillType _gapfillType;
+
+  PreAggregationGapFillDataTableReducer(QueryContext queryContext) {
+    _queryContext = queryContext;
+    _gapfillType = queryContext.getGapfillType();
+    _limitForAggregatedResult = queryContext.getLimit();
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      _limitForGapfilledResult = queryContext.getLimit();
+    } else {
+      _limitForGapfilledResult = queryContext.getSubQueryContext().getLimit();
+    }
+
+    ExpressionContext gapFillSelection = GapfillUtils.getGapfillExpressionContext(queryContext);
+
+    Preconditions.checkArgument(
+        gapFillSelection != null && gapFillSelection.getFunction() != null, "Gapfill Expression should be function.");
+    List<ExpressionContext> args = gapFillSelection.getFunction().getArguments();
+    Preconditions.checkArgument(
+        args.size() > 5, "PreAggregateGapFill does not have correct number of arguments.");
+    Preconditions.checkArgument(
+        args.get(1).getLiteral() != null, "The second argument of PostAggregateGapFill should be TimeFormatter.");
+    Preconditions.checkArgument(
+        args.get(2).getLiteral() != null, "The third argument of PostAggregateGapFill should be start time.");
+    Preconditions.checkArgument(
+        args.get(3).getLiteral() != null, "The fourth argument of PostAggregateGapFill should be end time.");
+    Preconditions.checkArgument(
+        args.get(4).getLiteral() != null, "The fifth argument of PostAggregateGapFill should be time bucket size.");
+
+    _dateTimeFormatter = new DateTimeFormatSpec(args.get(1).getLiteral());
+    _dateTimeGranularity = new DateTimeGranularitySpec(args.get(4).getLiteral());
+    String start = args.get(2).getLiteral();
+    _startMs = truncate(_dateTimeFormatter.fromFormatToMillis(start));
+    String end = args.get(3).getLiteral();
+    _endMs = truncate(_dateTimeFormatter.fromFormatToMillis(end));
+    _timeBucketSize = _dateTimeGranularity.granularityToMillis();
+
+    _fillExpressions = GapfillUtils.getFillExpressions(gapFillSelection);
+
+    _previousByGroupKey = new HashMap<>();
+    _groupByKeyIndexes = new ArrayList<>();
+    _groupByKeys = new HashSet<>();
+
+    ExpressionContext timeseriesOn = GapfillUtils.getTimeSeriesOnExpressionContext(gapFillSelection);
+    Preconditions.checkArgument(timeseriesOn != null, "The TimeSeriesOn expressions should be specified.");
+    _timeSeries = timeseriesOn.getFunction().getArguments();
+  }
+
+  private void replaceColumnNameWithAlias(DataSchema dataSchema) {
+    QueryContext queryContext;
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL_AGGREGATE) {
+      queryContext = _queryContext.getSubQueryContext().getSubQueryContext();
+    } else if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      queryContext = _queryContext;
+    } else {
+      queryContext = _queryContext.getSubQueryContext();
+    }
+    List<String> aliasList = queryContext.getAliasList();
+    Map<String, String> columnNameToAliasMap = new HashMap<>();
+    for (int i = 0; i < aliasList.size(); i++) {
+      if (aliasList.get(i) != null) {
+        ExpressionContext selection = queryContext.getSelectExpressions().get(i);
+        if (GapfillUtils.isGapfill(selection)) {
+          selection = selection.getFunction().getArguments().get(0);
+        }
+        columnNameToAliasMap.put(selection.toString(), aliasList.get(i));
+      }
+    }
+    for (int i = 0; i < dataSchema.getColumnNames().length; i++) {
+      if (columnNameToAliasMap.containsKey(dataSchema.getColumnNames()[i])) {
+        dataSchema.getColumnNames()[i] = columnNameToAliasMap.get(dataSchema.getColumnNames()[i]);
+      }
+    }
+  }
+
+  /**
+   * Computes the number of reduce threads to use per query.
+   * <ul>
+   *   <li> Use single thread if number of data tables to reduce is less than
+   *   {@value #MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE}.</li>
+   *   <li> Else, use min of max allowed reduce threads per query, and number of data tables.</li>
+   * </ul>
+   *
+   * @param numDataTables Number of data tables to reduce
+   * @param maxReduceThreadsPerQuery Max allowed reduce threads per query
+   * @return Number of reduce threads to use for the query
+   */
+  private int getNumReduceThreadsToUse(int numDataTables, int maxReduceThreadsPerQuery) {
+    // Use single thread if number of data tables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE.
+    if (numDataTables < MIN_DATA_TABLES_FOR_CONCURRENT_REDUCE) {
+      return Math.min(1, numDataTables); // Number of data tables can be zero.
+    }
+
+    return Math.min(maxReduceThreadsPerQuery, numDataTables);
+  }
+
+  private IndexedTable getIndexedTable(DataSchema dataSchema, Collection<DataTable> dataTablesToReduce,
+      DataTableReducerContext reducerContext)
+      throws TimeoutException {
+    long start = System.currentTimeMillis();
+    int numDataTables = dataTablesToReduce.size();
+
+    // Get the number of threads to use for reducing.
+    // In case of single reduce thread, fall back to SimpleIndexedTable to avoid redundant locking/unlocking calls.
+    int numReduceThreadsToUse = getNumReduceThreadsToUse(numDataTables, reducerContext.getMaxReduceThreadsPerQuery());
+    int limit = _queryContext.getLimit();
+    // TODO: Make minTrimSize configurable
+    int trimSize = GroupByUtils.getTableCapacity(limit);
+    // NOTE: For query with HAVING clause, use trimSize as resultSize to ensure the result accuracy.
+    // TODO: Resolve the HAVING clause within the IndexedTable before returning the result
+    int resultSize = _queryContext.getHavingFilter() != null ? trimSize : limit;
+    int trimThreshold = reducerContext.getGroupByTrimThreshold();
+    IndexedTable indexedTable;
+    if (numReduceThreadsToUse <= 1) {
+      indexedTable = new SimpleIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
+    } else {
+      if (trimThreshold >= GroupByOrderByCombineOperator.MAX_TRIM_THRESHOLD) {
+        // special case of trim threshold where it is set to max value.
+        // there won't be any trimming during upsert in this case.
+        // thus we can avoid the overhead of read-lock and write-lock
+        // in the upsert method.
+        indexedTable = new UnboundedConcurrentIndexedTable(dataSchema, _queryContext, resultSize);
+      } else {
+        indexedTable = new ConcurrentIndexedTable(dataSchema, _queryContext, resultSize, trimSize, trimThreshold);
+      }
+    }
+
+    Future[] futures = new Future[numDataTables];
+    CountDownLatch countDownLatch = new CountDownLatch(numDataTables);
+
+    // Create groups of data tables that each thread can process concurrently.
+    // Given that numReduceThreads is <= numDataTables, each group will have at least one data table.
+    ArrayList<DataTable> dataTables = new ArrayList<>(dataTablesToReduce);
+    List<List<DataTable>> reduceGroups = new ArrayList<>(numReduceThreadsToUse);
+
+    for (int i = 0; i < numReduceThreadsToUse; i++) {
+      reduceGroups.add(new ArrayList<>());
+    }
+    for (int i = 0; i < numDataTables; i++) {
+      reduceGroups.get(i % numReduceThreadsToUse).add(dataTables.get(i));
+    }
+
+    int cnt = 0;
+    ColumnDataType[] storedColumnDataTypes = dataSchema.getStoredColumnDataTypes();
+    int numColumns = storedColumnDataTypes.length;
+    for (List<DataTable> reduceGroup : reduceGroups) {
+      futures[cnt++] = reducerContext.getExecutorService().submit(new TraceRunnable() {
+        @Override
+        public void runJob() {
+          for (DataTable dataTable : reduceGroup) {
+            int numRows = dataTable.getNumberOfRows();
+
+            try {
+              for (int rowId = 0; rowId < numRows; rowId++) {
+                Object[] values = new Object[numColumns];
+                for (int colId = 0; colId < numColumns; colId++) {
+                  switch (storedColumnDataTypes[colId]) {
+                    case INT:
+                      values[colId] = dataTable.getInt(rowId, colId);
+                      break;
+                    case LONG:
+                      values[colId] = dataTable.getLong(rowId, colId);
+                      break;
+                    case FLOAT:
+                      values[colId] = dataTable.getFloat(rowId, colId);
+                      break;
+                    case DOUBLE:
+                      values[colId] = dataTable.getDouble(rowId, colId);
+                      break;
+                    case STRING:
+                      values[colId] = dataTable.getString(rowId, colId);
+                      break;
+                    case BYTES:
+                      values[colId] = dataTable.getBytes(rowId, colId);
+                      break;
+                    case OBJECT:
+                      values[colId] = dataTable.getObject(rowId, colId);
+                      break;
+                    // Add other aggregation intermediate result / group-by column type supports here
+                    default:
+                      throw new IllegalStateException();
+                  }
+                }
+                indexedTable.upsert(new Record(values));
+              }
+            } finally {
+              countDownLatch.countDown();
+            }
+          }
+        }
+      });
+    }
+
+    try {
+      long timeOutMs = reducerContext.getReduceTimeOutMs() - (System.currentTimeMillis() - start);
+      countDownLatch.await(timeOutMs, TimeUnit.MILLISECONDS);
+    } catch (InterruptedException e) {
+      for (Future future : futures) {
+        if (!future.isDone()) {
+          future.cancel(true);
+        }
+      }
+      throw new TimeoutException("Timed out in broker reduce phase.");
+    }
+
+    indexedTable.finish(true);
+    return indexedTable;
+  }
+
+  /**
+   * Here are three things that happen
+   * 1. Sort the result sets from all pinot servers based on timestamp
+   * 2. Gapfill the data for missing entities per time bucket
+   * 3. Aggregate the dataset per time bucket.
+   */
+  @Override
+  public void reduceAndSetResults(String tableName, DataSchema dataSchema,
+      Map<ServerRoutingInstance, DataTable> dataTableMap, BrokerResponseNative brokerResponseNative,
+      DataTableReducerContext reducerContext, BrokerMetrics brokerMetrics) {
+    DataSchema resultTableSchema = getResultTableDataSchema(dataSchema);
+    if (dataTableMap.isEmpty()) {
+      brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, Collections.emptyList()));
+      return;
+    }
+
+    String[] columns = dataSchema.getColumnNames();
+
+    Map<String, Integer> indexes = new HashMap<>();
+    for (int i = 0; i < columns.length; i++) {
+      indexes.put(columns[i], i);
+    }
+
+    _isGroupBySelections = new boolean[dataSchema.getColumnDataTypes().length];
+
+    // The first one argument of timeSeries is time column. The left ones are defining entity.
+    for (ExpressionContext entityColum : _timeSeries) {
+      int index = indexes.get(entityColum.getIdentifier());
+      _isGroupBySelections[index] = true;
+      _groupByKeyIndexes.add(index);
+    }
+
+    List<Object[]> sortedRawRows;
+    if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_AGGREGATE
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+      sortedRawRows = mergeAndSort(dataTableMap.values(), dataSchema);
+    } else {
+      try {
+        IndexedTable indexedTable = getIndexedTable(dataSchema, dataTableMap.values(), reducerContext);
+        sortedRawRows = mergeAndSort(indexedTable, dataSchema);
+      } catch (TimeoutException e) {
+        brokerResponseNative.getProcessingExceptions()
+            .add(new QueryProcessingException(QueryException.BROKER_TIMEOUT_ERROR_CODE, e.getMessage()));
+        return;
+      }
+    }
+    List<Object[]> resultRows;
+    replaceColumnNameWithAlias(dataSchema);
+    if (_queryContext.getAggregationFunctions() != null) {
+      validateGroupByForOuterQuery();
+    }
+
+    if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_AGGREGATE
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL
+        || _gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+      List<Object[]> gapfilledRows = gapFillAndAggregate(sortedRawRows, resultTableSchema, dataSchema);
+      if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL_SELECT) {
+        List<String> selectionColumns = SelectionOperatorUtils.getSelectionColumns(_queryContext, dataSchema);
+        resultRows = new ArrayList<>(gapfilledRows.size());
+
+        Map<String, Integer> columnNameToIndexMap = new HashMap<>(dataSchema.getColumnNames().length);
+        String[] columnNames = dataSchema.getColumnNames();
+        for (int i = 0; i < columnNames.length; i++) {
+          columnNameToIndexMap.put(columnNames[i], i);
+        }
+
+        ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+        ColumnDataType[] resultColumnDataTypes = new ColumnDataType[selectionColumns.size()];
+        for (int i = 0; i < resultColumnDataTypes.length; i++) {
+          String name = selectionColumns.get(i);
+          int index = columnNameToIndexMap.get(name);
+          resultColumnDataTypes[i] = columnDataTypes[index];
+        }
+
+        for (Object[] row : gapfilledRows) {
+          Object[] resultRow = new Object[selectionColumns.size()];
+          for (int i = 0; i < selectionColumns.size(); i++) {
+            int index = columnNameToIndexMap.get(selectionColumns.get(i));
+            resultRow[i] = resultColumnDataTypes[i].convertAndFormat(row[index]);
+          }
+          resultRows.add(resultRow);
+        }
+      } else {
+        resultRows = gapfilledRows;
+      }
+    } else {
+      this.setupColumnTypeForAggregatedColum(dataSchema.getColumnDataTypes());
+      ColumnDataType[] columnDataTypes = dataSchema.getColumnDataTypes();
+      for (Object[] row : sortedRawRows) {
+        extractFinalAggregationResults(row);
+        for (int i = 0; i < columnDataTypes.length; i++) {
+          row[i] = columnDataTypes[i].convert(row[i]);
+        }
+      }
+      resultRows = gapFillAndAggregate(sortedRawRows, resultTableSchema, dataSchema);
+    }
+    brokerResponseNative.setResultTable(new ResultTable(resultTableSchema, resultRows));
+  }
+
+  private void extractFinalAggregationResults(Object[] row) {
+    AggregationFunction[] aggregationFunctions;
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL) {
+      aggregationFunctions = _queryContext.getSubQueryContext().getAggregationFunctions();
+    } else {
+      aggregationFunctions = _queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions();
+    }
+    int numAggregationFunctionsForInnerQuery = aggregationFunctions == null ? 0 : aggregationFunctions.length;
+    for (int i = 0; i < numAggregationFunctionsForInnerQuery; i++) {
+      int valueIndex = _timeSeries.size() + 1 + i;
+      row[valueIndex] = aggregationFunctions[i].extractFinalResult(row[valueIndex]);
+    }
+  }
+
+  private void setupColumnTypeForAggregatedColum(ColumnDataType[] columnDataTypes) {
+    AggregationFunction[] aggregationFunctions;
+    if (_gapfillType == GapfillUtils.GapfillType.AGGREGATE_GAP_FILL) {
+      aggregationFunctions = _queryContext.getSubQueryContext().getAggregationFunctions();
+    } else {
+      aggregationFunctions = _queryContext.getSubQueryContext().getSubQueryContext().getAggregationFunctions();
+    }
+    int numAggregationFunctionsForInnerQuery = aggregationFunctions == null ? 0 : aggregationFunctions.length;
+    for (int i = 0; i < numAggregationFunctionsForInnerQuery; i++) {
+      columnDataTypes[_timeSeries.size() + 1 + i] = aggregationFunctions[i].getFinalResultColumnType();
+    }
+  }
+
+  /**
+   * Constructs the DataSchema for the ResultTable.
+   */
+  private DataSchema getResultTableDataSchema(DataSchema dataSchema) {
+    if (_gapfillType == GapfillUtils.GapfillType.GAP_FILL) {
+      return dataSchema;
+    }
+
+    int numOfColumns = _queryContext.getSelectExpressions().size();
+    String [] columnNames = new String[numOfColumns];
+    ColumnDataType [] columnDataTypes = new ColumnDataType[numOfColumns];
+    for (int i = 0; i < numOfColumns; i++) {
+      ExpressionContext expressionContext = _queryContext.getSelectExpressions().get(i);
+      if (GapfillUtils.isGapfill(expressionContext)) {
+        expressionContext = expressionContext.getFunction().getArguments().get(0);
+      }
+      if (expressionContext.getType() != ExpressionContext.Type.FUNCTION) {
+        columnNames[i] = expressionContext.getIdentifier();
+        columnDataTypes[i] = ColumnDataType.STRING;
+      } else {
+        FunctionContext functionContext = expressionContext.getFunction();
+        AggregationFunction aggregationFunction =
+            AggregationFunctionFactory.getAggregationFunction(functionContext, _queryContext);
+        columnDataTypes[i] = aggregationFunction.getFinalResultColumnType();
+        columnNames[i] = functionContext.toString();
+      }
+    }
+    return new DataSchema(columnNames, columnDataTypes);
+  }
+
+  private Key constructGroupKeys(Object[] row) {
+    Object [] groupKeys = new Object[_groupByKeyIndexes.size()];
+    for (int i = 0; i < _groupByKeyIndexes.size(); i++) {
+      groupKeys[i] = row[_groupByKeyIndexes.get(i)];
+    }
+    return new Key(groupKeys);
+  }
+
+  private long truncate(long epoch) {
+    int sz = _dateTimeGranularity.getSize();
+    return epoch / sz * sz;
+  }
+
+  private List<Object[]> gapFillAndAggregate(List<Object[]> sortedRows,
+      DataSchema dataSchemaForAggregatedResult,
+      DataSchema dataSchema) {
+    List<Object[]> result = new ArrayList<>();
+
+    PreAggregateGapfillFilterHandler postGapfillFilterHandler = null;
+    if (_queryContext.getSubQueryContext() != null && _queryContext.getFilter() != null) {
+      postGapfillFilterHandler = new PreAggregateGapfillFilterHandler(_queryContext.getFilter(), dataSchema);
+    }
+    PreAggregateGapfillFilterHandler postAggregateHavingFilterHandler = null;
+    if (_queryContext.getHavingFilter() != null) {
+      postAggregateHavingFilterHandler = new PreAggregateGapfillFilterHandler(
+          _queryContext.getHavingFilter(), dataSchemaForAggregatedResult);
+    }
+    Object[] previous = null;
+    Iterator<Object[]> sortedIterator = sortedRows.iterator();
+    for (long time = _startMs; time < _endMs; time += _timeBucketSize) {
+      List<Object[]> bucketedResult = new ArrayList<>();
+      previous = gapfill(time, bucketedResult, sortedIterator, previous, dataSchema, postGapfillFilterHandler);
+      if (_queryContext.getAggregationFunctions() == null) {
+        result.addAll(bucketedResult);
+      } else if (bucketedResult.size() > 0) {
+        List<Object[]> aggregatedRows = aggregateGapfilledData(bucketedResult, dataSchema);
+        for (Object[] aggregatedRow : aggregatedRows) {
+          if (postAggregateHavingFilterHandler == null || postAggregateHavingFilterHandler.isMatch(aggregatedRow)) {
+            result.add(aggregatedRow);
+          }
+          if (result.size() >= _limitForAggregatedResult) {
+            return result;
+          }
+        }
+      }
+    }
+    return result;
+  }
+
+  private Object[] gapfill(long bucketTime,
+      List<Object[]> bucketedResult,
+      Iterator<Object[]> sortedIterator,
+      Object[] previous,
+      DataSchema dataSchema,
+      PreAggregateGapfillFilterHandler postGapfillFilterHandler) {
+    ColumnDataType[] resultColumnDataTypes = dataSchema.getColumnDataTypes();
+    int numResultColumns = resultColumnDataTypes.length;
+    Set<Key> keys = new HashSet<>(_groupByKeys);
+    if (previous == null && sortedIterator.hasNext()) {
+      previous = sortedIterator.next();
+    }
+
+    while (previous != null) {
+      Object[] resultRow = previous;
+      for (int i = 0; i < resultColumnDataTypes.length; i++) {
+        resultRow[i] = resultColumnDataTypes[i].format(resultRow[i]);
+      }
+
+      long timeCol = _dateTimeFormatter.fromFormatToMillis(String.valueOf(resultRow[0]));
+      if (timeCol > bucketTime) {
+        break;
+      }
+      if (timeCol == bucketTime) {
+        if (postGapfillFilterHandler == null || postGapfillFilterHandler.isMatch(previous)) {
+          if (bucketedResult.size() >= _limitForGapfilledResult) {
+            _limitForGapfilledResult = 0;
+            break;
+          } else {
+            bucketedResult.add(resultRow);
+          }
+        }
+        Key key = constructGroupKeys(resultRow);
+        keys.remove(key);
+        _previousByGroupKey.put(key, resultRow);
+      }
+      if (sortedIterator.hasNext()) {
+        previous = sortedIterator.next();
+      } else {
+        previous = null;
+      }
+    }
+
+    for (Key key : keys) {
+      Object[] gapfillRow = new Object[numResultColumns];
+      int keyIndex = 0;
+      if (resultColumnDataTypes[0] == ColumnDataType.LONG) {
+        gapfillRow[0] = Long.valueOf(_dateTimeFormatter.fromMillisToFormat(bucketTime));
+      } else {
+        gapfillRow[0] = _dateTimeFormatter.fromMillisToFormat(bucketTime);
+      }
+      for (int i = 1; i < _isGroupBySelections.length; i++) {
+        if (_isGroupBySelections[i]) {
+          gapfillRow[i] = key.getValues()[keyIndex++];
+        } else {
+          gapfillRow[i] = getFillValue(i, dataSchema.getColumnName(i), key, resultColumnDataTypes[i]);
+        }
+      }
+
+      if (postGapfillFilterHandler == null || postGapfillFilterHandler.isMatch(gapfillRow)) {
+        if (bucketedResult.size() >= _limitForGapfilledResult) {
+          break;
+        } else {
+          bucketedResult.add(gapfillRow);
+        }
+      }
+    }
+    if (_limitForGapfilledResult > _groupByKeys.size()) {
+      _limitForGapfilledResult -= _groupByKeys.size();
+    } else {
+      _limitForGapfilledResult = 0;
+    }
+    return previous;
+  }
+
+  /**
+   * Make sure that the outer query has the group by clause and the group by clause has the time bucket.
+   */
+  private void validateGroupByForOuterQuery() {
+    List<ExpressionContext> groupbyExpressions = _queryContext.getGroupByExpressions();
+    Preconditions.checkArgument(groupbyExpressions != null, "No GroupBy Clause.");
+    List<ExpressionContext> innerSelections = _queryContext.getSubQueryContext().getSelectExpressions();
+    String timeBucketCol = null;
+    List<String> strAlias = _queryContext.getSubQueryContext().getAliasList();
+    for (int i = 0; i < innerSelections.size(); i++) {
+      ExpressionContext innerSelection = innerSelections.get(i);
+      if (GapfillUtils.isGapfill(innerSelection)) {
+        if (strAlias.get(i) != null) {
+          timeBucketCol = strAlias.get(i);
+        } else {
+          timeBucketCol = innerSelection.getFunction().getArguments().get(0).toString();
+        }
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(timeBucketCol != null, "No Group By timebucket.");
+
+    boolean findTimeBucket = false;
+    for (ExpressionContext groupbyExp : groupbyExpressions) {
+      if (timeBucketCol.equals(groupbyExp.toString())) {
+        findTimeBucket = true;
+        break;
+      }
+    }
+
+    Preconditions.checkArgument(findTimeBucket, "No Group By timebucket.");
+  }
+
+  private List<Object[]> aggregateGapfilledData(List<Object[]> bucketedRows, DataSchema dataSchema) {

Review comment:
       This looks like a hacky way to calculate the group-by results by constructing column-major block from result rows. Can we use `IndexedTable` here which directly takes result rows?
   Another problem of using this approach is that, if the subquery is aggregation query, the result row will hold intermediate result, which cannot be fed into the `AggregationFunction.aggregateGroupBySV()`. `AggregationFunction.merge()` is the one to use for intermediate result.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org