You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/11/04 18:17:43 UTC

(pinot) branch master updated: Add DATETIMECONVERTWINDOWHOP function (#11773)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new baea4a20df Add DATETIMECONVERTWINDOWHOP function (#11773)
baea4a20df is described below

commit baea4a20dfa0fb56e545078c7fa11e01a478e681
Author: Oleksii <al...@gmail.com>
AuthorDate: Sat Nov 4 19:17:37 2023 +0100

    Add DATETIMECONVERTWINDOWHOP function (#11773)
---
 .../common/function/TransformFunctionType.java     |   4 +
 .../function/FunctionDefinitionRegistryTest.java   |   1 +
 .../DateTimeConversionHopTransformFunction.java    | 151 +++++++++++
 .../function/TransformFunctionFactory.java         |   2 +
 .../BaseDateTimeWindowHopTransformer.java          |  89 +++++++
 .../DateTimeWindowHopTransformerFactory.java       |  65 +++++
 .../EpochToEpochWindowHopTransformer.java          |  48 ++++
 .../EpochToSDFHopWindowTransformer.java            |  49 ++++
 .../SDFToEpochWindowHopTransformer.java            |  48 ++++
 .../datetimehop/SDFToSDFWindowHopTransformer.java  |  48 ++++
 .../function/BaseTransformFunctionTest.java        |  73 ++++--
 ...meConversionWindowHopTransformFunctionTest.java | 141 ++++++++++
 .../DateTimeConverterHopWindowTest.java            | 283 +++++++++++++++++++++
 13 files changed, 981 insertions(+), 21 deletions(-)

diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
index f40c09d20e..2b0b249e3e 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/TransformFunctionType.java
@@ -134,6 +134,10 @@ public enum TransformFunctionType {
       OperandTypes.family(ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER,
           SqlTypeFamily.CHARACTER)), "date_time_convert"),
 
+  DATE_TIME_CONVERT_WINDOW_HOP("dateTimeConvertWindowHop", ReturnTypes.TO_ARRAY, OperandTypes.family(
+      ImmutableList.of(SqlTypeFamily.ANY, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER,
+          SqlTypeFamily.CHARACTER)), "date_time_convert_window_hop"),
+
   DATE_TRUNC("dateTrunc",
       ReturnTypes.BIGINT_FORCE_NULLABLE,
       OperandTypes.family(
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
index ed2021c2e2..ba4ba42a3c 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/function/FunctionDefinitionRegistryTest.java
@@ -49,6 +49,7 @@ public class FunctionDefinitionRegistryTest {
       // Functions without scalar function counterpart as of now
       "arraylength", "arrayaverage", "arraymin", "arraymax", "arraysum", "clpdecode", "groovy", "inidset",
       "jsonextractscalar", "jsonextractindex", "jsonextractkey", "lookup", "mapvalue", "timeconvert", "valuein",
+      "datetimeconvertwindowhop",
       // functions not needed for register b/c they are in std sql table or they will not be composed directly.
       "in", "not_in", "and", "or", "range", "extract", "is_true", "is_not_true", "is_false", "is_not_false"
   );
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/DateTimeConversionHopTransformFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/DateTimeConversionHopTransformFunction.java
new file mode 100644
index 0000000000..9211ef62fc
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/DateTimeConversionHopTransformFunction.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.core.operator.ColumnContext;
+import org.apache.pinot.core.operator.blocks.ValueBlock;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.BaseDateTimeWindowHopTransformer;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.DateTimeWindowHopTransformerFactory;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.EpochToEpochWindowHopTransformer;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.EpochToSDFHopWindowTransformer;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.SDFToEpochWindowHopTransformer;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.SDFToSDFWindowHopTransformer;
+import org.roaringbitmap.RoaringBitmap;
+
+
+/**
+ * The <code>DateTimeConversionHopTransformFunction</code> class implements the date time conversion
+ * with hop transform function.
+ * <ul>
+ *   <li>
+ *     This transform function should be invoked with arguments:
+ *     <ul>
+ *       <li>Column name to convert. E.g. Date</li>
+ *       <li>Input format of the column. E.g. EPOCH|MILLISECONDS (See Pipe Format in DateTimeFormatSpec)</li>
+ *       <li>Output format. E.g. EPOCH|MILLISECONDS/|10</li>
+ *       <li>Output granularity. E.g. MINUTES|15</li>
+ *       <li>Hop window size. E.g. HOURS</li>
+ *     </ul>
+ *   </li>
+ *   <li>
+ *     Outputs:
+ *     <ul>
+ *       <li>Time values converted to the desired format and bucketed to desired granularity with hop windows</li>
+ *       <li>Below is an example for one hour window with 15min hop for 12:10</li>
+ *        |-----------------|  11:15 - 12:15
+ *            |-----------------|  11:30 - 12:30
+ *                |-----------------|  11:45 - 12:45
+ *                    |-----------------|  12:00 - 13:00
+ *       <li>The beginning of the windows returned</>
+ *       <li>The end of the window can be fetched by adding window size</>
+ *     </ul>
+ *   </li>
+ * </ul>
+ */
+public class DateTimeConversionHopTransformFunction extends BaseTransformFunction {
+  public static final String FUNCTION_NAME = "dateTimeConvertWindowHop";
+
+  private TransformFunction _mainTransformFunction;
+  private TransformResultMetadata _resultMetadata;
+  private BaseDateTimeWindowHopTransformer<?, ?> _dateTimeTransformer;
+
+  @Override
+  public void init(List<TransformFunction> arguments, Map<String, ColumnContext> columnContextMap) {
+    super.init(arguments, columnContextMap);
+    // Check that there are exactly 4 arguments
+    if (arguments.size() != 5) {
+      throw new IllegalArgumentException("Exactly 5 arguments are required for DATE_TIME_CONVERT_HOP function");
+    }
+    TransformFunction firstArgument = arguments.get(0);
+    if (firstArgument instanceof LiteralTransformFunction || !firstArgument.getResultMetadata().isSingleValue()) {
+      throw new IllegalArgumentException(
+          "The first argument of DATE_TIME_CONVERT_HOP transform function must be a single-valued column or "
+              + "a transform function");
+    }
+    _mainTransformFunction = firstArgument;
+
+    _dateTimeTransformer = DateTimeWindowHopTransformerFactory.getDateTimeTransformer(
+        ((LiteralTransformFunction) arguments.get(1)).getStringLiteral(),
+        ((LiteralTransformFunction) arguments.get(2)).getStringLiteral(),
+        ((LiteralTransformFunction) arguments.get(3)).getStringLiteral(),
+        ((LiteralTransformFunction) arguments.get(4)).getStringLiteral());
+    if (_dateTimeTransformer instanceof EpochToEpochWindowHopTransformer
+        || _dateTimeTransformer instanceof SDFToEpochWindowHopTransformer) {
+      _resultMetadata = LONG_MV_NO_DICTIONARY_METADATA;
+    } else {
+      _resultMetadata = STRING_MV_NO_DICTIONARY_METADATA;
+    }
+  }
+
+  @Override
+  public String getName() {
+    return FUNCTION_NAME;
+  }
+
+  @Override
+  public TransformResultMetadata getResultMetadata() {
+    return _resultMetadata;
+  }
+
+  @Override
+  public long[][] transformToLongValuesMV(ValueBlock valueBlock) {
+    if (_resultMetadata != LONG_MV_NO_DICTIONARY_METADATA) {
+      return super.transformToLongValuesMV(valueBlock);
+    }
+
+    int length = valueBlock.getNumDocs();
+    initLongValuesMV(length);
+    if (_dateTimeTransformer instanceof EpochToEpochWindowHopTransformer) {
+      EpochToEpochWindowHopTransformer dateTimeTransformer = (EpochToEpochWindowHopTransformer) _dateTimeTransformer;
+      dateTimeTransformer.transform(_mainTransformFunction.transformToLongValuesSV(valueBlock), _longValuesMV, length);
+    } else if (_dateTimeTransformer instanceof SDFToEpochWindowHopTransformer) {
+      SDFToEpochWindowHopTransformer dateTimeTransformer = (SDFToEpochWindowHopTransformer) _dateTimeTransformer;
+      dateTimeTransformer.transform(_mainTransformFunction.transformToStringValuesSV(valueBlock), _longValuesMV,
+          length);
+    }
+    return _longValuesMV;
+  }
+
+  public String[][] transformToStringValuesMV(ValueBlock valueBlock) {
+    if (_resultMetadata != STRING_MV_NO_DICTIONARY_METADATA) {
+      return super.transformToStringValuesMV(valueBlock);
+    }
+
+    int length = valueBlock.getNumDocs();
+    initStringValuesMV(length);
+    if (_dateTimeTransformer instanceof EpochToSDFHopWindowTransformer) {
+      EpochToSDFHopWindowTransformer dateTimeTransformer = (EpochToSDFHopWindowTransformer) _dateTimeTransformer;
+      dateTimeTransformer.transform(_mainTransformFunction.transformToLongValuesSV(valueBlock), _stringValuesMV,
+          length);
+    } else if (_dateTimeTransformer instanceof SDFToSDFWindowHopTransformer) {
+      SDFToSDFWindowHopTransformer dateTimeTransformer = (SDFToSDFWindowHopTransformer) _dateTimeTransformer;
+      dateTimeTransformer.transform(_mainTransformFunction.transformToStringValuesSV(valueBlock), _stringValuesMV,
+          length);
+    }
+    return _stringValuesMV;
+  }
+
+  @Override
+  public RoaringBitmap getNullBitmap(ValueBlock valueBlock) {
+    return _mainTransformFunction.getNullBitmap(valueBlock);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
index fa74acc9af..49541841ca 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/function/TransformFunctionFactory.java
@@ -123,6 +123,8 @@ public class TransformFunctionFactory {
     typeToImplementation.put(TransformFunctionType.JSON_EXTRACT_KEY, JsonExtractKeyTransformFunction.class);
     typeToImplementation.put(TransformFunctionType.TIME_CONVERT, TimeConversionTransformFunction.class);
     typeToImplementation.put(TransformFunctionType.DATE_TIME_CONVERT, DateTimeConversionTransformFunction.class);
+    typeToImplementation.put(TransformFunctionType.DATE_TIME_CONVERT_WINDOW_HOP,
+        DateTimeConversionHopTransformFunction.class);
     typeToImplementation.put(TransformFunctionType.DATE_TRUNC, DateTruncTransformFunction.class);
     typeToImplementation.put(TransformFunctionType.JSON_EXTRACT_INDEX, JsonExtractIndexTransformFunction.class);
     typeToImplementation.put(TransformFunctionType.YEAR, DateTimeTransformFunction.Year.class);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/BaseDateTimeWindowHopTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/BaseDateTimeWindowHopTransformer.java
new file mode 100644
index 0000000000..c05b6f1001
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/BaseDateTimeWindowHopTransformer.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.operator.transform.transformer.datetimehop;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.core.operator.transform.transformer.DataTransformer;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeFormatUnitSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormatter;
+
+
+public abstract class BaseDateTimeWindowHopTransformer<I, O> implements DataTransformer<I, O> {
+  protected final long _hopWindowSizeMillis;
+  private final int _inputTimeSize;
+  private final TimeUnit _inputTimeUnit;
+  private final DateTimeFormatter _inputDateTimeFormatter;
+  private final int _outputTimeSize;
+  private final DateTimeFormatUnitSpec.DateTimeTransformUnit _outputTimeUnit;
+  private final DateTimeFormatter _outputDateTimeFormatter;
+  private final long _outputGranularityMillis;
+
+  public BaseDateTimeWindowHopTransformer(DateTimeFormatSpec inputFormat, DateTimeFormatSpec outputFormat,
+      DateTimeGranularitySpec outputGranularity, DateTimeGranularitySpec hopWindowSize) {
+    _inputTimeSize = inputFormat.getColumnSize();
+    _inputTimeUnit = inputFormat.getColumnUnit();
+    _inputDateTimeFormatter = inputFormat.getDateTimeFormatter();
+    _outputTimeSize = outputFormat.getColumnSize();
+    _outputTimeUnit = outputFormat.getColumnDateTimeTransformUnit();
+    _outputDateTimeFormatter = outputFormat.getDateTimeFormatter();
+    _outputGranularityMillis = outputGranularity.granularityToMillis();
+    _hopWindowSizeMillis = hopWindowSize.granularityToMillis();
+  }
+
+  protected long transformEpochToMillis(long epochTime) {
+    return _inputTimeUnit.toMillis(epochTime * _inputTimeSize);
+  }
+
+  protected long transformSDFToMillis(String sdfTime) {
+    return _inputDateTimeFormatter.parseMillis(sdfTime);
+  }
+
+  protected long transformMillisToEpoch(long millisSinceEpoch) {
+    return _outputTimeUnit.fromMillis(millisSinceEpoch) / _outputTimeSize;
+  }
+
+  protected String transformMillisToSDF(long millisSinceEpoch) {
+    return _outputDateTimeFormatter.print(new DateTime(millisSinceEpoch));
+  }
+
+  protected long transformToOutputGranularity(long millisSinceEpoch) {
+    return (millisSinceEpoch / _outputGranularityMillis) * _outputGranularityMillis;
+  }
+
+  protected List<Long> hopWindows(long millisSinceEpoch) {
+    List<Long> hops = new ArrayList<>();
+    long totalHopMillis = _hopWindowSizeMillis;
+    long granularityMillis = _outputGranularityMillis;
+
+    long adjustedMillis = (millisSinceEpoch / granularityMillis) * granularityMillis;
+
+    // Start from the adjusted timestamp and decrement by the hop until we've covered the entire window duration
+    for (long currentMillis = adjustedMillis; currentMillis > millisSinceEpoch - totalHopMillis;
+        currentMillis -= granularityMillis) {
+      hops.add(currentMillis);
+    }
+    return hops;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/DateTimeWindowHopTransformerFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/DateTimeWindowHopTransformerFactory.java
new file mode 100644
index 0000000000..3dba64fd9f
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/DateTimeWindowHopTransformerFactory.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.operator.transform.transformer.datetimehop;
+
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFieldSpec.TimeFormat;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+public class DateTimeWindowHopTransformerFactory {
+
+  private static final TimeFormat EPOCH = DateTimeFieldSpec.TimeFormat.EPOCH;
+  private static final TimeFormat TIMESTAMP = DateTimeFieldSpec.TimeFormat.TIMESTAMP;
+
+  private DateTimeWindowHopTransformerFactory() {
+  }
+
+  public static BaseDateTimeWindowHopTransformer getDateTimeTransformer(String inputFormatStr, String outputFormatStr,
+      String outputGranularityStr, String hopSizeStr) {
+    DateTimeFormatSpec inputFormatSpec = new DateTimeFormatSpec(inputFormatStr);
+    DateTimeFormatSpec outputFormatSpec = new DateTimeFormatSpec(outputFormatStr);
+    DateTimeGranularitySpec outputGranularity = new DateTimeGranularitySpec(outputGranularityStr);
+    DateTimeGranularitySpec hopSizeFormat = new DateTimeGranularitySpec(hopSizeStr);
+
+    TimeFormat inputFormat = inputFormatSpec.getTimeFormat();
+    TimeFormat outputFormat = outputFormatSpec.getTimeFormat();
+
+    if (isEpochOrTimestamp(inputFormat) && isEpochOrTimestamp(outputFormat)) {
+      return new EpochToEpochWindowHopTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
+    } else if (isEpochOrTimestamp(inputFormat) && isStringFormat(outputFormat)) {
+      return new EpochToSDFHopWindowTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
+    } else if (isStringFormat(inputFormat) && isEpochOrTimestamp(outputFormat)) {
+      return new SDFToEpochWindowHopTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
+    } else if (isStringFormat(inputFormat) && isStringFormat(outputFormat)) {
+      return new SDFToSDFWindowHopTransformer(inputFormatSpec, outputFormatSpec, outputGranularity, hopSizeFormat);
+    }
+    throw new IllegalArgumentException("Wrong inputFormat: " + inputFormat + " outputFormat: " + outputFormat);
+  }
+
+  private static boolean isEpochOrTimestamp(TimeFormat format) {
+    return format == EPOCH || format == TIMESTAMP;
+  }
+
+  private static boolean isStringFormat(TimeFormat format) {
+    return format == TimeFormat.SIMPLE_DATE_FORMAT;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/EpochToEpochWindowHopTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/EpochToEpochWindowHopTransformer.java
new file mode 100644
index 0000000000..943c02f877
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/EpochToEpochWindowHopTransformer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.operator.transform.transformer.datetimehop;
+
+import java.util.List;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+public class EpochToEpochWindowHopTransformer extends BaseDateTimeWindowHopTransformer<long[], long[][]> {
+  public EpochToEpochWindowHopTransformer(DateTimeFormatSpec inputFormat, DateTimeFormatSpec outputFormat,
+      DateTimeGranularitySpec outputGranularity, DateTimeGranularitySpec hopSize) {
+    super(inputFormat, outputFormat, outputGranularity, hopSize);
+  }
+
+  @Override
+  public void transform(long[] input, long[][] output, int length) {
+    for (int i = 0; i < length; i++) {
+      long epochTime = input[i];
+      long millisSinceEpoch = transformEpochToMillis(epochTime);
+      List<Long> hopWindows = hopWindows(millisSinceEpoch);
+
+      long[] transformedArray = new long[hopWindows.size()];
+      for (int j = 0; j < hopWindows.size(); j++) {
+        long millis = hopWindows.get(j);
+        transformedArray[j] = transformMillisToEpoch(millis);
+      }
+      output[i] = transformedArray;
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/EpochToSDFHopWindowTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/EpochToSDFHopWindowTransformer.java
new file mode 100644
index 0000000000..331cf368f4
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/EpochToSDFHopWindowTransformer.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.operator.transform.transformer.datetimehop;
+
+import java.util.List;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+public class EpochToSDFHopWindowTransformer extends BaseDateTimeWindowHopTransformer<long[], String[][]> {
+
+  public EpochToSDFHopWindowTransformer(DateTimeFormatSpec inputFormat, DateTimeFormatSpec outputFormat,
+      DateTimeGranularitySpec outputGranularity, DateTimeGranularitySpec hopWindowSize) {
+    super(inputFormat, outputFormat, outputGranularity, hopWindowSize);
+  }
+
+  @Override
+  public void transform(long[] input, String[][] output, int length) {
+    for (int i = 0; i < length; i++) {
+      long epochTime = input[i];
+      long millisSinceEpoch = transformEpochToMillis(epochTime);
+      List<Long> hopWindows = hopWindows(millisSinceEpoch);
+
+      String[] transformedArray = new String[hopWindows.size()];
+      for (int j = 0; j < hopWindows.size(); j++) {
+        long millis = hopWindows.get(j);
+        transformedArray[j] = transformMillisToSDF(millis);
+      }
+      output[i] = transformedArray;
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/SDFToEpochWindowHopTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/SDFToEpochWindowHopTransformer.java
new file mode 100644
index 0000000000..e43ef6cf79
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/SDFToEpochWindowHopTransformer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.operator.transform.transformer.datetimehop;
+
+import java.util.List;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+public class SDFToEpochWindowHopTransformer extends BaseDateTimeWindowHopTransformer<String[], long[][]> {
+
+  public SDFToEpochWindowHopTransformer(DateTimeFormatSpec inputFormat, DateTimeFormatSpec outputFormat,
+      DateTimeGranularitySpec outputGranularity, DateTimeGranularitySpec hopWindowSize) {
+    super(inputFormat, outputFormat, outputGranularity, hopWindowSize);
+  }
+
+  @Override
+  public void transform(String[] input, long[][] output, int length) {
+    for (int i = 0; i < length; i++) {
+      String sdfTime = input[i];
+      long millisSinceEpoch = transformSDFToMillis(sdfTime);
+      List<Long> hopWindows = hopWindows(millisSinceEpoch);
+
+      long[] epochTimes = new long[hopWindows.size()];
+      for (int j = 0; j < hopWindows.size(); j++) {
+        epochTimes[j] = transformMillisToEpoch(hopWindows.get(j));
+      }
+      output[i] = epochTimes;
+    }
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/SDFToSDFWindowHopTransformer.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/SDFToSDFWindowHopTransformer.java
new file mode 100644
index 0000000000..5016a47033
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/transform/transformer/datetimehop/SDFToSDFWindowHopTransformer.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.operator.transform.transformer.datetimehop;
+
+import java.util.List;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+
+
+public class SDFToSDFWindowHopTransformer extends BaseDateTimeWindowHopTransformer<String[], String[][]> {
+
+  public SDFToSDFWindowHopTransformer(DateTimeFormatSpec inputFormat, DateTimeFormatSpec outputFormat,
+      DateTimeGranularitySpec outputGranularity, DateTimeGranularitySpec hopWindowSize) {
+    super(inputFormat, outputFormat, outputGranularity, hopWindowSize);
+  }
+
+  @Override
+  public void transform(String[] input, String[][] output, int length) {
+    for (int i = 0; i < length; i++) {
+      String sdfTime = input[i];
+      long millisSinceEpoch = transformSDFToMillis(sdfTime);
+      List<Long> hopWindows = hopWindows(millisSinceEpoch);
+
+      String[] transformedStrings = new String[hopWindows.size()];
+      for (int j = 0; j < hopWindows.size(); j++) {
+        transformedStrings[j] = transformMillisToSDF(hopWindows.get(j));
+      }
+      output[i] = transformedStrings;
+    }
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
index 586a1bf63f..2c685a82bc 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/BaseTransformFunctionTest.java
@@ -66,10 +66,6 @@ import static org.testng.Assert.assertEquals;
 
 
 public abstract class BaseTransformFunctionTest {
-  private static final String SEGMENT_NAME = "testSegment";
-  private static final String INDEX_DIR_PATH = FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
-  private static final Random RANDOM = new Random();
-
   protected static final int NUM_ROWS = 1000;
   protected static final int MAX_NUM_MULTI_VALUES = 5;
   protected static final int MAX_MULTI_VALUE = 10;
@@ -84,20 +80,14 @@ public abstract class BaseTransformFunctionTest {
   protected static final String STRING_SV_COLUMN = "stringSV";
   protected static final String JSON_STRING_SV_COLUMN = "jsonSV";
   protected static final String STRING_SV_NULL_COLUMN = "stringSVNull";
-
   protected static final String BYTES_SV_COLUMN = "bytesSV";
-
   protected static final String VECTOR_1_COLUMN = "vector1";
   protected static final String VECTOR_2_COLUMN = "vector2";
   protected static final String ZERO_VECTOR_COLUMN = "zeroVector";
-
   protected static final String STRING_ALPHANUM_SV_COLUMN = "stringAlphaNumSV";
-
   protected static final String STRING_ALPHANUM_NULL_SV_COLUMN = "stringAlphaNumSVNull";
-
   protected static final String INT_MV_COLUMN = "intMV";
   protected static final String INT_MV_NULL_COLUMN = "intMVNull";
-
   protected static final String LONG_MV_COLUMN = "longMV";
   protected static final String FLOAT_MV_COLUMN = "floatMV";
   protected static final String DOUBLE_MV_COLUMN = "doubleMV";
@@ -114,9 +104,11 @@ public abstract class BaseTransformFunctionTest {
   protected static final String LONG_MV_COLUMN_2 = "longMV2";
   protected static final String FLOAT_MV_COLUMN_2 = "floatMV2";
   protected static final String DOUBLE_MV_COLUMN_2 = "doubleMV2";
-
   protected static final String JSON_COLUMN = "json";
   protected static final String DEFAULT_JSON_COLUMN = "defaultJson";
+  private static final String SEGMENT_NAME = "testSegment";
+  private static final String INDEX_DIR_PATH = FileUtils.getTempDirectoryPath() + File.separator + SEGMENT_NAME;
+  private static final Random RANDOM = new Random();
   protected final int[] _intSVValues = new int[NUM_ROWS];
   protected final long[] _longSVValues = new long[NUM_ROWS];
   protected final float[] _floatSVValues = new float[NUM_ROWS];
@@ -366,8 +358,7 @@ public abstract class BaseTransformFunctionTest {
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
     float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
     double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
-    BigDecimal[] bigDecimalValues =
-        transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
     String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       if (expectedNull.contains(i)) {
@@ -408,8 +399,7 @@ public abstract class BaseTransformFunctionTest {
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
     float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
     double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
-    BigDecimal[] bigDecimalValues =
-        transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
     String[] stringValues = transformFunction.transformToStringValuesSV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
       if (expectedNull.contains(i)) {
@@ -546,8 +536,7 @@ public abstract class BaseTransformFunctionTest {
     long[] longValues = transformFunction.transformToLongValuesSV(_projectionBlock);
     float[] floatValues = transformFunction.transformToFloatValuesSV(_projectionBlock);
     double[] doubleValues = transformFunction.transformToDoubleValuesSV(_projectionBlock);
-    BigDecimal[] bigDecimalValues =
-        transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
+    BigDecimal[] bigDecimalValues = transformFunction.transformToBigDecimalValuesSV(_projectionBlock);
 
     for (int i = 0; i < NUM_ROWS; i++) {
       if (expectedNulls.contains(i)) {
@@ -612,12 +601,20 @@ public abstract class BaseTransformFunctionTest {
   }
 
   protected void testTransformFunctionMV(TransformFunction transformFunction, int[][] expectedValues) {
+    testTransformFunctionMVWithNull(transformFunction, expectedValues, null);
+  }
+
+  protected void testTransformFunctionMVWithNull(TransformFunction transformFunction, int[][] expectedValues,
+      RoaringBitmap expectedNull) {
     int[][] intValuesMV = transformFunction.transformToIntValuesMV(_projectionBlock);
     long[][] longValuesMV = transformFunction.transformToLongValuesMV(_projectionBlock);
     float[][] floatValuesMV = transformFunction.transformToFloatValuesMV(_projectionBlock);
     double[][] doubleValuesMV = transformFunction.transformToDoubleValuesMV(_projectionBlock);
     String[][] stringValuesMV = transformFunction.transformToStringValuesMV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
+      if (expectedNull != null && expectedNull.contains(i)) {
+        continue;
+      }
       int[] expectedValueMV = expectedValues[i];
       int numValues = expectedValueMV.length;
       assertEquals(intValuesMV[i].length, numValues);
@@ -633,16 +630,25 @@ public abstract class BaseTransformFunctionTest {
         assertEquals(stringValuesMV[i][j], Integer.toString(expectedValues[i][j]));
       }
     }
-    testNullBitmap(transformFunction, null);
+
+    testNullBitmap(transformFunction, expectedNull);
   }
 
   protected void testTransformFunctionMV(TransformFunction transformFunction, long[][] expectedValues) {
+    testTransformFunctionMVWithNull(transformFunction, expectedValues, null);
+  }
+
+  protected void testTransformFunctionMVWithNull(TransformFunction transformFunction, long[][] expectedValues,
+      RoaringBitmap expectedNull) {
     int[][] intValuesMV = transformFunction.transformToIntValuesMV(_projectionBlock);
     long[][] longValuesMV = transformFunction.transformToLongValuesMV(_projectionBlock);
     float[][] floatValuesMV = transformFunction.transformToFloatValuesMV(_projectionBlock);
     double[][] doubleValuesMV = transformFunction.transformToDoubleValuesMV(_projectionBlock);
     String[][] stringValuesMV = transformFunction.transformToStringValuesMV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
+      if (expectedNull != null && expectedNull.contains(i)) {
+        continue;
+      }
       long[] expectedValueMV = expectedValues[i];
       int numValues = expectedValueMV.length;
       assertEquals(intValuesMV[i].length, numValues);
@@ -658,16 +664,24 @@ public abstract class BaseTransformFunctionTest {
         assertEquals(stringValuesMV[i][j], Long.toString(expectedValues[i][j]));
       }
     }
-    testNullBitmap(transformFunction, null);
+    testNullBitmap(transformFunction, expectedNull);
   }
 
   protected void testTransformFunctionMV(TransformFunction transformFunction, float[][] expectedValues) {
+    testTransformFunctionMVWithNull(transformFunction, expectedValues, null);
+  }
+
+  protected void testTransformFunctionMVWithNull(TransformFunction transformFunction, float[][] expectedValues,
+      RoaringBitmap expectedNull) {
     int[][] intValuesMV = transformFunction.transformToIntValuesMV(_projectionBlock);
     long[][] longValuesMV = transformFunction.transformToLongValuesMV(_projectionBlock);
     float[][] floatValuesMV = transformFunction.transformToFloatValuesMV(_projectionBlock);
     double[][] doubleValuesMV = transformFunction.transformToDoubleValuesMV(_projectionBlock);
     String[][] stringValuesMV = transformFunction.transformToStringValuesMV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
+      if (expectedNull != null && expectedNull.contains(i)) {
+        continue;
+      }
       float[] expectedValueMV = expectedValues[i];
       int numValues = expectedValueMV.length;
       assertEquals(intValuesMV[i].length, numValues);
@@ -687,12 +701,21 @@ public abstract class BaseTransformFunctionTest {
   }
 
   protected void testTransformFunctionMV(TransformFunction transformFunction, double[][] expectedValues) {
+    testTransformFunctionlMVWithNull(transformFunction, expectedValues, null);
+  }
+
+  protected void testTransformFunctionlMVWithNull(TransformFunction transformFunction, double[][] expectedValues,
+      RoaringBitmap expectedNull) {
     int[][] intValuesMV = transformFunction.transformToIntValuesMV(_projectionBlock);
     long[][] longValuesMV = transformFunction.transformToLongValuesMV(_projectionBlock);
     float[][] floatValuesMV = transformFunction.transformToFloatValuesMV(_projectionBlock);
     double[][] doubleValuesMV = transformFunction.transformToDoubleValuesMV(_projectionBlock);
     String[][] stringValuesMV = transformFunction.transformToStringValuesMV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
+      if (expectedNull != null && expectedNull.contains(i)) {
+        continue;
+      }
+
       double[] expectedValueMV = expectedValues[i];
       int numValues = expectedValueMV.length;
       assertEquals(intValuesMV[i].length, numValues);
@@ -708,15 +731,23 @@ public abstract class BaseTransformFunctionTest {
         assertEquals(stringValuesMV[i][j], Double.toString(expectedValues[i][j]));
       }
     }
-    testNullBitmap(transformFunction, null);
+    testNullBitmap(transformFunction, expectedNull);
   }
 
   protected void testTransformFunctionMV(TransformFunction transformFunction, String[][] expectedValues) {
+    testTransformFunctionMVWithNull(transformFunction, expectedValues, null);
+  }
+
+  protected void testTransformFunctionMVWithNull(TransformFunction transformFunction, String[][] expectedValues,
+      RoaringBitmap expectedNull) {
     String[][] stringValuesMV = transformFunction.transformToStringValuesMV(_projectionBlock);
     for (int i = 0; i < NUM_ROWS; i++) {
+      if (expectedNull != null && expectedNull.contains(i)) {
+        continue;
+      }
       assertEquals(stringValuesMV[i], expectedValues[i]);
     }
-    testNullBitmap(transformFunction, null);
+    testNullBitmap(transformFunction, expectedNull);
   }
 
   @AfterClass
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/DateTimeConversionWindowHopTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/DateTimeConversionWindowHopTransformFunctionTest.java
new file mode 100644
index 0000000000..61c6a1e71d
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/DateTimeConversionWindowHopTransformFunctionTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.operator.transform.function;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.request.context.RequestContextUtils;
+import org.apache.pinot.core.operator.transform.TransformResultMetadata;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.exception.BadQueryRequestException;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.roaringbitmap.RoaringBitmap;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class DateTimeConversionWindowHopTransformFunctionTest extends BaseTransformFunctionTest {
+  @Test
+  public void testDateTimeConversionWindowHopEpochTransformFunction() {
+    // NOTE: functionality of DateTimeConverterWindowHop is covered in DateTimeConverterWindowHop
+    ExpressionContext expression = RequestContextUtils.getExpression(String.format(
+        "dateTimeConvertWindowHop(%s,'1:MILLISECONDS:EPOCH'," + "'1:MINUTES:EPOCH','1:MINUTES', '2:MINUTES')",
+        TIME_COLUMN));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+
+    assertTrue(transformFunction instanceof DateTimeConversionHopTransformFunction);
+    assertEquals(transformFunction.getName(), DateTimeConversionHopTransformFunction.FUNCTION_NAME);
+
+    TransformResultMetadata resultMetadata = transformFunction.getResultMetadata();
+    assertFalse(resultMetadata.isSingleValue());
+    assertEquals(resultMetadata.getDataType(), FieldSpec.DataType.LONG);
+    long[][] expectedValues = new long[NUM_ROWS][];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedValues[i] = new long[2];
+      expectedValues[i][0] = TimeUnit.MILLISECONDS.toMinutes(_timeValues[i]);
+      expectedValues[i][1] = TimeUnit.MILLISECONDS.toMinutes(_timeValues[i]) - 1;
+    }
+    testTransformFunctionMV(transformFunction, expectedValues);
+  }
+
+  @Test
+  public void testDateTimeWindowHopSDFTransformFunction() {
+    // NOTE: functionality of DateTimeConverterWindowHop is covered in DateTimeConverterWindowHop
+    ExpressionContext expression = RequestContextUtils.getExpression(String.format(
+        "dateTimeConvertWindowHop(%s,'1:MILLISECONDS:EPOCH',"
+            + "'1:MINUTES:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm tz(GMT)','1:MINUTES', '2:MINUTES')", TIME_COLUMN));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+
+    assertTrue(transformFunction instanceof DateTimeConversionHopTransformFunction);
+    assertEquals(transformFunction.getName(), DateTimeConversionHopTransformFunction.FUNCTION_NAME);
+
+    TransformResultMetadata resultMetadata = transformFunction.getResultMetadata();
+    assertFalse(resultMetadata.isSingleValue());
+    assertEquals(resultMetadata.getDataType(), FieldSpec.DataType.STRING);
+
+    DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm").withZone(DateTimeZone.UTC);
+
+    String[][] expectedValues = new String[NUM_ROWS][];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      expectedValues[i] = new String[2];
+      expectedValues[i][0] = formatter.print(_timeValues[i]);
+      expectedValues[i][1] = formatter.print(_timeValues[i] - 60 * 1000);
+    }
+    testTransformFunctionMV(transformFunction, expectedValues);
+  }
+
+  @Test(dataProvider = "testIllegalArguments", expectedExceptions = {BadQueryRequestException.class})
+  public void testIllegalArguments(String expressionStr) {
+    ExpressionContext expression = RequestContextUtils.getExpression(expressionStr);
+    TransformFunctionFactory.get(expression, _dataSourceMap);
+  }
+
+  @DataProvider(name = "testIllegalArguments")
+  public Object[][] testIllegalArguments() {
+    return new Object[][]{
+        new Object[]{
+            String.format("dateTimeConvertWindowHop(%s,'1:MILLISECONDS:EPOCH','1:MINUTES:EPOCH', '1:MINUTE')",
+                TIME_COLUMN)
+        }, new Object[]{
+        "dateTimeConvertWindowHop(5,'1:MILLISECONDS:EPOCH','1:MINUTES:EPOCH','1:MINUTES', '2:MINUTES')"
+    }, new Object[]{
+        String.format(
+            "dateTimeConvertWindowHop(%s,'1:MILLISECONDS:EPOCH'," + "'1:MINUTES:EPOCH','1:MINUTES', '2:MINUTES')",
+            LONG_MV_COLUMN)
+    }, new Object[]{
+        String.format(
+            "dateTimeConvertWindowHop(%s,'1:MILLISECONDS:EPOCH'," + "'1:MINUTES:EPOCH','MINUTES:1', '2:MINUTES')",
+            TIME_COLUMN)
+    }, new Object[]{
+        String.format("dateTimeConvertWindowHop(%s, %s,'1:MINUTES:EPOCH'," + "'1:MINUTES', '2:MINUTES')", TIME_COLUMN,
+            INT_SV_COLUMN)
+    }
+    };
+  }
+
+  @Test
+  public void testDateTimeConversionTransformFunctionNullColumn() {
+    ExpressionContext expression = RequestContextUtils.getExpression(String.format(
+        "dateTimeConvertWindowHop(%s,'1:MILLISECONDS:EPOCH','1:MINUTES:EPOCH'," + "'1:MINUTES', '2:MINUTES')",
+        TIMESTAMP_COLUMN_NULL));
+    TransformFunction transformFunction = TransformFunctionFactory.get(expression, _dataSourceMap);
+    assertTrue(transformFunction instanceof DateTimeConversionHopTransformFunction);
+    assertEquals(transformFunction.getName(), DateTimeConversionHopTransformFunction.FUNCTION_NAME);
+    TransformResultMetadata resultMetadata = transformFunction.getResultMetadata();
+    assertEquals(resultMetadata.getDataType(), FieldSpec.DataType.LONG);
+    RoaringBitmap expectedNulls = new RoaringBitmap();
+    long[][] expectedValues = new long[NUM_ROWS][];
+    for (int i = 0; i < NUM_ROWS; i++) {
+      if (isNullRow(i)) {
+        expectedNulls.add(i);
+      } else {
+        expectedValues[i] = new long[2];
+        expectedValues[i][0] = TimeUnit.MILLISECONDS.toMinutes(_timeValues[i]);
+        expectedValues[i][1] = TimeUnit.MILLISECONDS.toMinutes(_timeValues[i]) - 1;
+      }
+    }
+    testTransformFunctionMVWithNull(transformFunction, expectedValues, expectedNulls);
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/transformer/datetimehopwindow/DateTimeConverterHopWindowTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/transformer/datetimehopwindow/DateTimeConverterHopWindowTest.java
new file mode 100644
index 0000000000..235f68e42b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/transformer/datetimehopwindow/DateTimeConverterHopWindowTest.java
@@ -0,0 +1,283 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pinot.core.operator.transform.transformer.datetimehopwindow;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.BaseDateTimeWindowHopTransformer;
+import org.apache.pinot.core.operator.transform.transformer.datetimehop.DateTimeWindowHopTransformerFactory;
+import org.testng.Assert;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+
+public class DateTimeConverterHopWindowTest {
+  @SuppressWarnings("unchecked")
+  @Test(dataProvider = "testDateTimeHopWindowConversion")
+  public void testDateTimeHopWindowConversion(String inputFormat, String outputFormat, String outputGranularity,
+      String windowGranularity, Object input, Object expected) {
+    BaseDateTimeWindowHopTransformer converter =
+        DateTimeWindowHopTransformerFactory.getDateTimeTransformer(inputFormat, outputFormat, outputGranularity,
+            windowGranularity);
+    int length;
+    Object output;
+    if (expected instanceof long[][]) {
+      length = ((long[][]) expected).length;
+      output = new long[length][];
+      for (int i = 0; i < length; i++) {
+        ((long[][]) output)[i] = new long[((long[][]) expected)[i].length];
+      }
+    } else {
+      length = ((String[][]) expected).length;
+      output = new String[length][];
+    }
+    converter.transform(input, output, length);
+    Assert.assertEquals(output, expected);
+  }
+
+  @DataProvider(name = "testDateTimeHopWindowConversion")
+  public Object[][] testDateTimeHopWindowConversion() {
+    List<Object[]> entries = new ArrayList<>();
+
+    /*************** Epoch to Epoch ***************/
+    {
+      // Test bucketing to 15 minutes with 1h hop window, for one value
+      long[] input = {
+          1696587946000L /* Fri Oct 06 2023 10:25:46 GMT+0000 */
+      };
+      long[][] expected = {
+          new long[]{
+              1696587300000L /* Fri Oct 06 2023 10:15:00 GMT+0000 */, 1696586400000L /* Fri Oct 06 2023 10:00:00
+              GMT+0000 */, 1696585500000L /* Fri Oct 06 2023 09:45:00 GMT+0000 */, 1696584600000L /* Fri Oct 06 2023
+              09:30:00 GMT+0000 */,
+          }
+      };
+      entries.add(new Object[]{
+          "EPOCH|MILLISECONDS", "EPOCH|MILLISECONDS", "MINUTES|15", "HOURS", input, expected
+      });
+    }
+    {
+      // Test bucketing to 15 minutes with 1h hop window, multiple values
+      long[] input = {
+          1696587946000L /* Fri Oct 06 2023 10:25:46 GMT+0000 */, 1693998340000L /* Wed Sep 06 2023 11:05:40 GMT+0000
+           */,
+      };
+      long[][] expected = {
+          new long[]{
+              1696587300000L /* Fri Oct 06 2023 10:15:00 GMT+0000 */, 1696586400000L /* Fri Oct 06 2023 10:00:00
+              GMT+0000 */, 1696585500000L /* Fri Oct 06 2023 09:45:00 GMT+0000 */, 1696584600000L /* Fri Oct 06 2023
+              09:30:00 GMT+0000 */,
+          }, new long[]{
+          1693998000000L /* Fri Oct 06 2023 10:15:00 GMT+0000 */, 1693997100000L /* Fri Oct 06 2023 10:00:00 GMT+0000
+           */, 1693996200000L /* Fri Oct 06 2023 09:45:00 GMT+0000 */, 1693995300000L /* Fri Oct 06 2023 09:30:00
+           GMT+0000 */,
+      }
+      };
+      entries.add(new Object[]{
+          "EPOCH|MILLISECONDS", "EPOCH|MILLISECONDS", "MINUTES|15", "HOURS", input, expected
+      });
+    }
+    {
+      // Test bucketing with conversion to hours with 1h hop window, 15m bucketing
+      long[] input = {
+          1696587946000L /* Fri Oct 06 2023 10:25:46 GMT+0000 */
+      };
+      long[][] expected = {
+          new long[]{
+              471274L /* Fri Oct 06 2023 10:00:00 GMT+0000 */, 471274L /* Fri Oct 06 2023 10:00:00 GMT+0000 */,
+              471273L /* Fri Oct 06 2023 09:00:00 GMT+0000 */, 471273L /* Fri Oct 06 2023 09:00:00 GMT+0000 */,
+          }
+      };
+      entries.add(new Object[]{"EPOCH|MILLISECONDS", "EPOCH|HOURS", "MINUTES|15", "HOURS", input, expected});
+    }
+    {
+      // Test bucketing with conversion to 15 min with 1h hop window, 15m bucketing
+      long[] input = {
+          1696587946000L /* Fri Oct 06 2023 10:25:46 GMT+0000 */
+      };
+      long[][] expected = {
+          new long[]{
+              1885097L /* Fri Oct 06 2023 10:15:00 GMT+0000 */, 1885096L /* Fri Oct 06 2023 10:00:00 GMT+0000 */,
+              1885095L /* Fri Oct 06 2023 09:45:00 GMT+0000 */, 1885094L /* Fri Oct 06 2023 09:30:00 GMT+0000 */,
+          }
+      };
+      entries.add(new Object[]{"EPOCH|MILLISECONDS", "EPOCH|MINUTES|15", "MINUTES|15", "HOURS", input, expected});
+    }
+    {
+      {
+        // Test bucketing to 1hour with 15m hop window. Since there is no intersection - empty array is expected
+        long[] input = {
+            1696587946000L /* Fri Oct 06 2023 10:25:46 GMT+0000 */
+        };
+        long[][] expected = {
+            new long[]{}
+        };
+        entries.add(new Object[]{
+            "EPOCH|MILLISECONDS", "EPOCH|MILLISECONDS", "MINUTES|60", "MINUTES|15", input, expected
+        });
+      }
+    }
+    {
+      {
+        // Test bucketing with non-aligned window
+        long[] input = {
+            1696587946000L /* Fri Oct 06 2023 10:25:46 GMT+0000 */
+        };
+        long[][] expected = {
+            new long[]{
+                1696587300000L /* Fri Oct 06 2023 10:15:00 GMT+0000 */, 1696586400000L /* Fri Oct 06 2023 10:00:00
+                GMT+0000 */, 1696585500000L /* Fri Oct 06 2023 09:45:00 GMT+0000 */,
+            }
+        };
+        entries.add(new Object[]{
+            "EPOCH|MILLISECONDS", "EPOCH|MILLISECONDS", "MINUTES|15", "MINUTES|55", input, expected
+        });
+      }
+    }
+    /*************** Epoch to SDF ***************/
+    {
+      {
+        // Test conversion from millis since epoch to simple date format (GMT timezone)
+        long[] input = {
+            1696587946000L /* Fri Oct 06 2023 10:25:46 GMT+0000 */
+        };
+        String[][] expected = {
+            {"2023-10-06-10:15", "2023-10-06-10:00", "2023-10-06-09:45", "2023-10-06-09:30"}
+        };
+        entries.add(new Object[]{
+            "EPOCH|MILLISECONDS", "SIMPLE_DATE_FORMAT|yyyy-MM-dd-HH:mm|GMT", "MINUTES|15", "HOURS|1",
+            input, expected
+        });
+      }
+    }
+    {
+      {
+        // Test multiple values conversion from millis since epoch to simple date format (GMT timezone)
+        long[] input = {
+            1696587946000L, /* Fri Oct 06 2023 10:25:46 GMT+0000 */
+            1696582946000L /* Fri Oct 06 2023 09:02:26 GMT+0000 */
+        };
+        String[][] expected = {
+            {"2023-10-06-10:15", "2023-10-06-10:00", "2023-10-06-09:45", "2023-10-06-09:30"}, {
+            "2023-10-06-09:00", "2023-10-06-08:45", "2023-10-06-08:30", "2023-10-06-08:15"
+        }
+        };
+        entries.add(new Object[]{
+            "EPOCH|MILLISECONDS", "SIMPLE_DATE_FORMAT|yyyy-MM-dd-HH:mm|GMT", "MINUTES|15", "HOURS|1", input, expected
+        });
+      }
+    }
+    {
+      {
+        // Test single value conversion from hours to simple date format (Los Angeles timezone)
+        long[] input = {
+            471274L, /* Fri Oct 06 2023 10:00:00 GMT+0000 */
+        };
+        String[][] expected = {
+            {"2023-10-06-10:00", "2023-10-06-09:45", "2023-10-06-09:30", "2023-10-06-09:15"}
+        };
+        entries.add(new Object[]{
+            "EPOCH|HOURS", "SIMPLE_DATE_FORMAT|yyyy-MM-dd-HH:mm|GMT", "MINUTES|15", "HOURS|1", input, expected
+        });
+      }
+    }
+    {
+      {
+        // Test single value conversion from 2 hours to simple date format (Los Angeles timezone)
+        long[] input = {
+            235637L, /* Fri Oct 06 2023 10:00:00 GMT+0000 */
+        };
+        String[][] expected = {
+            {"2023-10-06-10:00", "2023-10-06-09:45", "2023-10-06-09:30", "2023-10-06-09:15"}
+        };
+        entries.add(new Object[]{
+            "EPOCH|HOURS|2", "SIMPLE_DATE_FORMAT|yyyy-MM-dd-HH:mm|GMT", "MINUTES|15", "HOURS|1", input, expected
+        });
+      }
+    }
+
+    /*************** SDF to EPOCH ***************/
+    {
+      // Test conversion from simple date format (GMT timezone) to millis since epoch with 1h window and 15m hop
+      String[] input = {
+          "2023-10-06 10:25:46"
+      };
+      long[][] expected = {
+          {
+              1696587300000L /* Fri Oct 06 2023 10:00:00 GMT+0000 */, 1696586400000L /* Fri Oct 06 2023 09:45:00
+              GMT+0000 */, 1696585500000L /* Fri Oct 06 2023 09:30:00 GMT+0000 */, 1696584600000L /* Fri Oct 06 2023
+              09:15:00 GMT+0000 */
+          }
+      };
+      entries.add(new Object[]{
+          "SIMPLE_DATE_FORMAT|yyyy-MM-dd HH:mm:ss|GMT", "EPOCH|MILLISECONDS|1", "MINUTES|15", "MINUTES|60",
+          input, expected
+      });
+    }
+    {
+      // Test conversion from simple date format (GMT timezone) to millis since epoch with 1h window
+      // and 15m hop with 1:HOURS input granularity
+      String[] input = {
+          "2023-10-06 10:25:46"
+      };
+      long[][] expected = {
+          {
+              1696587300000L /* Fri Oct 06 2023 10:00:00 GMT+0000 */, 1696586400000L /* Fri Oct 06 2023 09:45:00
+              GMT+0000 */, 1696585500000L /* Fri Oct 06 2023 09:30:00 GMT+0000 */, 1696584600000L /* Fri Oct 06 2023
+              09:15:00 GMT+0000 */
+          }
+      };
+      entries.add(new Object[]{
+          "SIMPLE_DATE_FORMAT|yyyy-MM-dd HH:mm:ss|GMT", "EPOCH|MILLISECONDS|1", "MINUTES|15", "MINUTES|60",
+          input, expected
+      });
+    }
+
+    /*************** SDF to SDF ***************/
+    {
+      // Test conversion from one simple date format to another with 1h window and 15m hop
+      String[] input = {
+          "2023-10-06 10:12:46"
+      };
+      String[][] expected = {
+          {"2023-10-06 10:00", "2023-10-06 09:45", "2023-10-06 09:30", "2023-10-06 09:15"}
+      };
+      entries.add(new Object[]{
+          "SIMPLE_DATE_FORMAT|yyyy-MM-dd HH:mm:ss|SECONDS|1", "SIMPLE_DATE_FORMAT|yyyy-MM-dd HH:mm|GMT|MINUTES|1",
+          "MINUTES|15", "MINUTES|60", input, expected
+      });
+    }
+    {
+      // Test conversion from one simple date format to another with 1h window and 15m hop. Different timezone
+      String[] input = {
+          "2023-10-06 10:12:46"
+      };
+      String[][] expected = {
+          {"2023-10-06 03:00", "2023-10-06 02:45", "2023-10-06 02:30", "2023-10-06 02:15"}
+      };
+      entries.add(new Object[]{
+          "SIMPLE_DATE_FORMAT|yyyy-MM-dd HH:mm:ss|SECONDS|1", "SIMPLE_DATE_FORMAT|yyyy-MM-dd "
+          + "HH:mm|America/Los_Angeles", "MINUTES|15", "MINUTES|60", input, expected
+      });
+    }
+
+    return entries.toArray(new Object[entries.size()][]);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org