You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/05/25 17:35:14 UTC

[incubator-pinot] branch master updated: Optimize TIME_CONVERT/DATE_TIME_CONVERT predicates (#6957)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c7dfcd  Optimize TIME_CONVERT/DATE_TIME_CONVERT predicates (#6957)
3c7dfcd is described below

commit 3c7dfcd8d65bbfeceb4af77d23298515251eaae6
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue May 25 10:34:52 2021 -0700

    Optimize TIME_CONVERT/DATE_TIME_CONVERT predicates (#6957)
    
    Add `TimePredicateFilterOptimizer` to optimize TIME_CONVERT/DATE_TIME_CONVERT function with range/equality predicate to directly apply the predicate to the inner expression.
    E.g. `dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') >= 1620830760000` can be optimized to `col >= 1620831600000`
    
    After optimizing the time convert function to directly work on the time column, the pruner and all the indexes can be applied.
---
 .../pinot/core/query/optimizer/QueryOptimizer.java |   9 +-
 .../filter/MergeRangeFilterOptimizer.java          | 101 +----
 .../pinot/core/query/optimizer/filter/Range.java   | 119 ++++++
 .../filter/TimePredicateFilterOptimizer.java       | 429 +++++++++++++++++++++
 .../filter/NumericalFilterOptimizerTest.java       |   2 -
 .../filter/TimePredicateFilterOptimizerTest.java   | 200 ++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |  27 +-
 .../pinot/spi/data/DateTimeGranularitySpec.java    |   2 +-
 8 files changed, 774 insertions(+), 115 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
index c91baa8..4cba605 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/QueryOptimizer.java
@@ -32,12 +32,19 @@ import org.apache.pinot.core.query.optimizer.filter.FlattenAndOrFilterOptimizer;
 import org.apache.pinot.core.query.optimizer.filter.MergeEqInFilterOptimizer;
 import org.apache.pinot.core.query.optimizer.filter.MergeRangeFilterOptimizer;
 import org.apache.pinot.core.query.optimizer.filter.NumericalFilterOptimizer;
+import org.apache.pinot.core.query.optimizer.filter.TimePredicateFilterOptimizer;
 import org.apache.pinot.spi.data.Schema;
 
 
 public class QueryOptimizer {
+  // DO NOT change the order of these optimizers.
+  // - MergeEqInFilterOptimizer and MergeRangeFilterOptimizer relies on FlattenAndOrFilterOptimizer to flatten the
+  //   AND/OR predicate so that the children are on the same level to be merged
+  // - TimePredicateFilterOptimizer and MergeRangeFilterOptimizer relies on NumericalFilterOptimizer to convert the
+  //   values to the proper format so that they can be properly parsed
   private static final List<FilterOptimizer> FILTER_OPTIMIZERS = Arrays
-      .asList(new FlattenAndOrFilterOptimizer(), new NumericalFilterOptimizer(), new MergeEqInFilterOptimizer(), new MergeRangeFilterOptimizer());
+      .asList(new FlattenAndOrFilterOptimizer(), new MergeEqInFilterOptimizer(), new NumericalFilterOptimizer(),
+          new TimePredicateFilterOptimizer(), new MergeRangeFilterOptimizer());
 
   /**
    * Optimizes the given PQL query.
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java
index e5148a2..46f73e9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/MergeRangeFilterOptimizer.java
@@ -25,12 +25,10 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.pinot.common.request.Expression;
 import org.apache.pinot.common.request.ExpressionType;
 import org.apache.pinot.common.request.FilterOperator;
 import org.apache.pinot.common.request.Function;
-import org.apache.pinot.common.request.context.predicate.RangePredicate;
 import org.apache.pinot.common.utils.request.FilterQueryTree;
 import org.apache.pinot.common.utils.request.RequestUtils;
 import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
@@ -46,7 +44,6 @@ import org.apache.pinot.spi.data.Schema;
  * NOTE: This optimizer follows the {@link FlattenAndOrFilterOptimizer}, so all the AND/OR filters are already
  *       flattened.
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
 public class MergeRangeFilterOptimizer implements FilterOptimizer {
 
   @Override
@@ -79,7 +76,7 @@ public class MergeRangeFilterOptimizer implements FilterOptimizer {
             continue;
           }
           // Create a range and merge with current range if exists
-          Range range = getRange(child.getValue().get(0), fieldSpec.getDataType());
+          Range range = Range.getRange(child.getValue().get(0), fieldSpec.getDataType());
           Range currentRange = rangeMap.get(column);
           if (currentRange == null) {
             rangeMap.put(column, range);
@@ -115,26 +112,6 @@ public class MergeRangeFilterOptimizer implements FilterOptimizer {
   }
 
   /**
-   * Helper method to create a Range from the given string representation of the range and data type. See
-   * {@link RangePredicate} for details.
-   */
-  private static Range getRange(String rangeString, DataType dataType) {
-    String[] split = StringUtils.split(rangeString, RangePredicate.DELIMITER);
-    String lower = split[0];
-    boolean lowerInclusive = lower.charAt(0) == RangePredicate.LOWER_INCLUSIVE;
-    String stringLowerBound = lower.substring(1);
-    Comparable lowerBound =
-        stringLowerBound.equals(RangePredicate.UNBOUNDED) ? null : dataType.convertInternal(stringLowerBound);
-    String upper = split[1];
-    int upperLength = upper.length();
-    boolean upperInclusive = upper.charAt(upperLength - 1) == RangePredicate.UPPER_INCLUSIVE;
-    String stringUpperBound = upper.substring(0, upperLength - 1);
-    Comparable upperBound =
-        stringUpperBound.equals(RangePredicate.UNBOUNDED) ? null : dataType.convertInternal(stringUpperBound);
-    return new Range(lowerBound, lowerInclusive, upperBound, upperInclusive);
-  }
-
-  /**
    * Helper method to construct a RANGE predicate FilterQueryTree from the given column and range.
    */
   private static FilterQueryTree getRangeFilterQueryTree(String column, Range range) {
@@ -233,6 +210,8 @@ public class MergeRangeFilterOptimizer implements FilterOptimizer {
       case BETWEEN:
         return new Range(getComparable(operands.get(1), dataType), true, getComparable(operands.get(2), dataType),
             true);
+      case RANGE:
+        return Range.getRange(operands.get(1).getLiteral().getStringValue(), dataType);
       default:
         throw new IllegalStateException("Unsupported filter kind: " + filterKind);
     }
@@ -241,6 +220,7 @@ public class MergeRangeFilterOptimizer implements FilterOptimizer {
   /**
    * Helper method to create a Comparable from the given literal expression and data type.
    */
+  @SuppressWarnings("rawtypes")
   private static Comparable getComparable(Expression literalExpression, DataType dataType) {
     return dataType.convertInternal(literalExpression.getLiteral().getFieldValue().toString());
   }
@@ -254,77 +234,4 @@ public class MergeRangeFilterOptimizer implements FilterOptimizer {
         RequestUtils.getLiteralExpression(range.getRangeString())));
     return rangeFilter;
   }
-
-  /**
-   * Helper class to represent a value range.
-   */
-  private static class Range {
-    Comparable _lowerBound;
-    boolean _lowerInclusive;
-    Comparable _upperBound;
-    boolean _upperInclusive;
-
-    Range(@Nullable Comparable lowerBound, boolean lowerInclusive, @Nullable Comparable upperBound,
-        boolean upperInclusive) {
-      _lowerBound = lowerBound;
-      _lowerInclusive = lowerInclusive;
-      _upperBound = upperBound;
-      _upperInclusive = upperInclusive;
-    }
-
-    /**
-     * Intersects the current range with another range.
-     */
-    void intersect(Range range) {
-      if (range._lowerBound != null) {
-        if (_lowerBound == null) {
-          _lowerInclusive = range._lowerInclusive;
-          _lowerBound = range._lowerBound;
-        } else {
-          int result = _lowerBound.compareTo(range._lowerBound);
-          if (result < 0) {
-            _lowerBound = range._lowerBound;
-            _lowerInclusive = range._lowerInclusive;
-          } else if (result == 0) {
-            _lowerInclusive &= range._lowerInclusive;
-          }
-        }
-      }
-      if (range._upperBound != null) {
-        if (_upperBound == null) {
-          _upperInclusive = range._upperInclusive;
-          _upperBound = range._upperBound;
-        } else {
-          int result = _upperBound.compareTo(range._upperBound);
-          if (result > 0) {
-            _upperBound = range._upperBound;
-            _upperInclusive = range._upperInclusive;
-          } else if (result == 0) {
-            _upperInclusive &= range._upperInclusive;
-          }
-        }
-      }
-    }
-
-    /**
-     * Returns the string representation of the range. See {@link RangePredicate} for details.
-     */
-    String getRangeString() {
-      StringBuilder stringBuilder = new StringBuilder();
-      if (_lowerBound == null) {
-        stringBuilder.append(RangePredicate.LOWER_EXCLUSIVE).append(RangePredicate.UNBOUNDED);
-      } else {
-        stringBuilder.append(_lowerInclusive ? RangePredicate.LOWER_INCLUSIVE : RangePredicate.LOWER_EXCLUSIVE);
-        stringBuilder.append(_lowerBound.toString());
-      }
-      stringBuilder.append(RangePredicate.DELIMITER);
-      if (_upperBound == null) {
-        stringBuilder.append(RangePredicate.UNBOUNDED).append(RangePredicate.UPPER_EXCLUSIVE);
-      } else {
-        stringBuilder.append(_upperBound.toString());
-        stringBuilder.append(_upperInclusive ? RangePredicate.UPPER_INCLUSIVE : RangePredicate.UPPER_EXCLUSIVE);
-      }
-      return stringBuilder.toString();
-    }
-  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/Range.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/Range.java
new file mode 100644
index 0000000..34de9b2
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/Range.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.context.predicate.RangePredicate;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+
+
+/**
+ * Helper class to represent a value range.
+ */
+@SuppressWarnings({"rawtypes", "unchecked"})
+public class Range {
+  private Comparable _lowerBound;
+  private boolean _lowerInclusive;
+  private Comparable _upperBound;
+  private boolean _upperInclusive;
+
+  public Range(@Nullable Comparable lowerBound, boolean lowerInclusive, @Nullable Comparable upperBound,
+      boolean upperInclusive) {
+    _lowerBound = lowerBound;
+    _lowerInclusive = lowerInclusive;
+    _upperBound = upperBound;
+    _upperInclusive = upperInclusive;
+  }
+
+  /**
+   * Intersects the current range with another range.
+   */
+  public void intersect(Range range) {
+    if (range._lowerBound != null) {
+      if (_lowerBound == null) {
+        _lowerInclusive = range._lowerInclusive;
+        _lowerBound = range._lowerBound;
+      } else {
+        int result = _lowerBound.compareTo(range._lowerBound);
+        if (result < 0) {
+          _lowerBound = range._lowerBound;
+          _lowerInclusive = range._lowerInclusive;
+        } else if (result == 0) {
+          _lowerInclusive &= range._lowerInclusive;
+        }
+      }
+    }
+    if (range._upperBound != null) {
+      if (_upperBound == null) {
+        _upperInclusive = range._upperInclusive;
+        _upperBound = range._upperBound;
+      } else {
+        int result = _upperBound.compareTo(range._upperBound);
+        if (result > 0) {
+          _upperBound = range._upperBound;
+          _upperInclusive = range._upperInclusive;
+        } else if (result == 0) {
+          _upperInclusive &= range._upperInclusive;
+        }
+      }
+    }
+  }
+
+  /**
+   * Returns the string representation of the range. See {@link RangePredicate} for details.
+   */
+  public String getRangeString() {
+    StringBuilder stringBuilder = new StringBuilder();
+    if (_lowerBound == null) {
+      stringBuilder.append(RangePredicate.LOWER_EXCLUSIVE).append(RangePredicate.UNBOUNDED);
+    } else {
+      stringBuilder.append(_lowerInclusive ? RangePredicate.LOWER_INCLUSIVE : RangePredicate.LOWER_EXCLUSIVE);
+      stringBuilder.append(_lowerBound);
+    }
+    stringBuilder.append(RangePredicate.DELIMITER);
+    if (_upperBound == null) {
+      stringBuilder.append(RangePredicate.UNBOUNDED).append(RangePredicate.UPPER_EXCLUSIVE);
+    } else {
+      stringBuilder.append(_upperBound);
+      stringBuilder.append(_upperInclusive ? RangePredicate.UPPER_INCLUSIVE : RangePredicate.UPPER_EXCLUSIVE);
+    }
+    return stringBuilder.toString();
+  }
+
+  /**
+   * Creates a Range from the given string representation of the range and data type. See {@link RangePredicate} for
+   * details.
+   */
+  public static Range getRange(String rangeString, DataType dataType) {
+    String[] split = StringUtils.split(rangeString, RangePredicate.DELIMITER);
+    String lower = split[0];
+    boolean lowerInclusive = lower.charAt(0) == RangePredicate.LOWER_INCLUSIVE;
+    String stringLowerBound = lower.substring(1);
+    Comparable lowerBound =
+        stringLowerBound.equals(RangePredicate.UNBOUNDED) ? null : dataType.convertInternal(stringLowerBound);
+    String upper = split[1];
+    int upperLength = upper.length();
+    boolean upperInclusive = upper.charAt(upperLength - 1) == RangePredicate.UPPER_INCLUSIVE;
+    String stringUpperBound = upper.substring(0, upperLength - 1);
+    Comparable upperBound =
+        stringUpperBound.equals(RangePredicate.UNBOUNDED) ? null : dataType.convertInternal(stringUpperBound);
+    return new Range(lowerBound, lowerInclusive, upperBound, upperInclusive);
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/TimePredicateFilterOptimizer.java b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/TimePredicateFilterOptimizer.java
new file mode 100644
index 0000000..3d3913a
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/optimizer/filter/TimePredicateFilterOptimizer.java
@@ -0,0 +1,429 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.ExpressionType;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.common.utils.request.FilterQueryTree;
+import org.apache.pinot.common.utils.request.RequestUtils;
+import org.apache.pinot.core.operator.transform.function.DateTimeConversionTransformFunction;
+import org.apache.pinot.core.operator.transform.function.TimeConversionTransformFunction;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.spi.data.DateTimeFieldSpec.TimeFormat;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.TimeUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * The {@code TimePredicateFilterOptimizer} optimizes the time related predicates:
+ * <ul>
+ *   <li>
+ *     Optimizes TIME_CONVERT/DATE_TIME_CONVERT function with range/equality predicate to directly apply the predicate
+ *     to the inner expression.
+ *     <p>E.g. "dateTimeConvert(col, '1:SECONDS:EPOCH', '1:MINUTES:EPOCH', '30:MINUTES') > 27013846" will be optimized
+ *     to "col >= 1620831600".
+ *     <p>NOTE: Other predicates such as NOT_EQUALS, IN, NOT_IN are not supported for now because these predicates are
+ *     not common on time column, and they cannot be optimized to a single range predicate.
+ *   </li>
+ * </ul>
+ *
+ * NOTE: This optimizer is followed by the {@link MergeRangeFilterOptimizer}, which can merge the generated ranges.
+ */
+public class TimePredicateFilterOptimizer implements FilterOptimizer {
+  private static final Logger LOGGER = LoggerFactory.getLogger(TimePredicateFilterOptimizer.class);
+
+  @Override
+  public FilterQueryTree optimize(FilterQueryTree filterQueryTree, @Nullable Schema schema) {
+    // Do not rewrite PQL queries because PQL is deprecated
+    return filterQueryTree;
+  }
+
+  @Override
+  public Expression optimize(Expression filterExpression, @Nullable Schema schema) {
+    return filterExpression.getType() == ExpressionType.FUNCTION ? optimize(filterExpression) : filterExpression;
+  }
+
+  @VisibleForTesting
+  Expression optimize(Expression filterExpression) {
+    Function filterFunction = filterExpression.getFunctionCall();
+    FilterKind filterKind = FilterKind.valueOf(filterFunction.getOperator());
+    List<Expression> operands = filterFunction.getOperands();
+    if (filterKind == FilterKind.AND || filterKind == FilterKind.OR) {
+      // NOTE: We don't need to replace the children because all the changes are applied in-place
+      for (Expression operand : operands) {
+        optimize(operand);
+      }
+    } else if (filterKind.isRange() || filterKind == FilterKind.EQUALS) {
+      Expression expression = operands.get(0);
+      if (expression.getType() == ExpressionType.FUNCTION) {
+        Function expressionFunction = expression.getFunctionCall();
+        String functionName = StringUtils.remove(expressionFunction.getOperator(), '_');
+        if (functionName.equalsIgnoreCase(TimeConversionTransformFunction.FUNCTION_NAME)) {
+          optimizeTimeConvert(filterFunction, filterKind);
+        } else if (functionName.equalsIgnoreCase(DateTimeConversionTransformFunction.FUNCTION_NAME)) {
+          optimizeDateTimeConvert(filterFunction, filterKind);
+        }
+      }
+    }
+    return filterExpression;
+  }
+
+  /**
+   * Helper method to optimize TIME_CONVERT function with range/equality predicate to directly apply the predicate to
+   * the inner expression. Changes are applied in-place of the filter function.
+   */
+  private void optimizeTimeConvert(Function filterFunction, FilterKind filterKind) {
+    List<Expression> filterOperands = filterFunction.getOperands();
+    List<Expression> timeConvertOperands = filterOperands.get(0).getFunctionCall().getOperands();
+    Preconditions.checkArgument(timeConvertOperands.size() == 3,
+        "Exactly 3 arguments are required for TIME_CONVERT transform function");
+    Preconditions
+        .checkArgument(isStringLiteral(timeConvertOperands.get(1)) && isStringLiteral(timeConvertOperands.get(2)),
+            "The 2nd and 3rd argument for TIME_CONVERT transform function must be string literal");
+
+    try {
+      TimeUnit inputTimeUnit = TimeUnit.valueOf(timeConvertOperands.get(1).getLiteral().getStringValue().toUpperCase());
+      TimeUnit outputTimeUnit =
+          TimeUnit.valueOf(timeConvertOperands.get(2).getLiteral().getStringValue().toUpperCase());
+
+      // For the same input and output time unit, directly remove the TIME_CONVERT function
+      if (inputTimeUnit == outputTimeUnit) {
+        filterOperands.set(0, timeConvertOperands.get(0));
+        return;
+      }
+
+      // Step 1: Convert output range to millis range
+      Long lowerMillis = null;
+      Long upperMillis = null;
+      switch (filterKind) {
+        case GREATER_THAN: {
+          // millisToFormat(millis) > n
+          // -> millisToFormat(millis) >= n + 1
+          // -> millis >= formatToMillis(n + 1)
+          //
+          // E.g.
+          // millisToSeconds(millis) > 0
+          // -> millisToSeconds(millis) >= 1
+          // -> millis >= 1000
+          //
+          // Note that 'millisToSeconds(millis) > 0' is not equivalent to 'millis > 0'
+          long lowerValue = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          lowerMillis = outputTimeUnit.toMillis(lowerValue + 1);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          break;
+        }
+        case GREATER_THAN_OR_EQUAL: {
+          // millisToFormat(millis) >= n
+          // -> millis >= formatToMillis(n)
+          long lowerValue = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          lowerMillis = outputTimeUnit.toMillis(lowerValue);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          break;
+        }
+        case LESS_THAN: {
+          // millisToFormat(millis) < n
+          // -> millis < formatToMillis(n)
+          long upperValue = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          upperMillis = outputTimeUnit.toMillis(upperValue);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        case LESS_THAN_OR_EQUAL: {
+          // millisToFormat(millis) <= n
+          // -> millisToFormat(millis) < n + 1
+          // -> millis < formatToMillis(n + 1)
+          //
+          // E.g.
+          // millisToSeconds(millis) <= 0
+          // -> millisToSeconds(millis) < 1
+          // -> millis < 1000
+          //
+          // Note that 'millisToSeconds(millis) <= 0' is not equivalent to 'millis <= 0'
+          long upperValue = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          upperMillis = outputTimeUnit.toMillis(upperValue + 1);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        case BETWEEN: {
+          // Combine GREATER_THAN_OR_EQUAL and LESS_THAN_OR_EQUAL
+          long lowerValue = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          lowerMillis = outputTimeUnit.toMillis(lowerValue);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          long upperValue = Long.parseLong(filterOperands.get(2).getLiteral().getFieldValue().toString());
+          upperMillis = outputTimeUnit.toMillis(upperValue + 1);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        case EQUALS: {
+          // Combine GREATER_THAN_OR_EQUAL and LESS_THAN_OR_EQUAL
+          long value = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          lowerMillis = outputTimeUnit.toMillis(value);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          upperMillis = outputTimeUnit.toMillis(value + 1);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        default:
+          throw new IllegalStateException();
+      }
+
+      // Step 2: Convert millis range to input range
+      Long lowerValue = null;
+      boolean lowerInclusive = false;
+      if (lowerMillis != null) {
+        // formatToMillis(col) >= millis
+        // - if (formatToMillis(millisToFormat(millis)) == millis)
+        //   -> col >= millisToFormat(millis)
+        // - else (formatToMillis(millisToFormat(millis)) < millis)
+        //   -> col > millisToFormat(millis)
+        //
+        // E.g.
+        // secondsToMillis(seconds) >= 123
+        // -> seconds > 0
+        // secondsToMillis(seconds) >= 0
+        // -> seconds >= 0
+        lowerValue = inputTimeUnit.convert(lowerMillis, TimeUnit.MILLISECONDS);
+        lowerInclusive = inputTimeUnit.toMillis(lowerValue) == lowerMillis;
+      }
+      Long upperValue = null;
+      boolean upperInclusive = false;
+      if (upperMillis != null) {
+        // formatToMillis(col) < millis
+        // - if (formatToMillis(millisToFormat(millis)) == millis)
+        //   -> col < millisToFormat(millis)
+        // - else (formatToMillis(millisToFormat(millis)) < millis)
+        //   -> col <= millisToFormat(millis)
+        //
+        // E.g.
+        // secondsToMillis(seconds) < 123
+        // -> seconds <= 0
+        // secondsToMillis(seconds) < 0
+        // -> seconds < 0
+        upperValue = inputTimeUnit.convert(upperMillis, TimeUnit.MILLISECONDS);
+        upperInclusive = inputTimeUnit.toMillis(upperValue) != upperMillis;
+      }
+
+      // Step 3: Rewrite the filter function
+      String rangeString = new Range(lowerValue, lowerInclusive, upperValue, upperInclusive).getRangeString();
+      filterFunction.setOperator(FilterKind.RANGE.name());
+      filterFunction
+          .setOperands(Arrays.asList(timeConvertOperands.get(0), RequestUtils.getLiteralExpression(rangeString)));
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while optimizing TIME_CONVERT predicate: {}, skipping the optimization",
+          filterFunction, e);
+    }
+  }
+
+  /**
+   * Helper method to optimize DATE_TIME_CONVERT function with range/equality predicate to directly apply the predicate
+   * to the inner expression. Changes are applied in-place of the filter function.
+   */
+  private void optimizeDateTimeConvert(Function filterFunction, FilterKind filterKind) {
+    List<Expression> filterOperands = filterFunction.getOperands();
+    List<Expression> dateTimeConvertOperands = filterOperands.get(0).getFunctionCall().getOperands();
+    Preconditions.checkArgument(dateTimeConvertOperands.size() == 4,
+        "Exactly 4 arguments are required for DATE_TIME_CONVERT transform function");
+    Preconditions.checkArgument(
+        isStringLiteral(dateTimeConvertOperands.get(1)) && isStringLiteral(dateTimeConvertOperands.get(2))
+            && isStringLiteral(dateTimeConvertOperands.get(3)),
+        "The 2nd to 4th arguments for DATE_TIME_CONVERT transform function must be string literal");
+
+    try {
+      DateTimeFormatSpec inputFormat =
+          new DateTimeFormatSpec(dateTimeConvertOperands.get(1).getLiteral().getStringValue());
+      DateTimeFormatSpec outputFormat =
+          new DateTimeFormatSpec(dateTimeConvertOperands.get(2).getLiteral().getStringValue());
+      // SDF output format is not supported because:
+      // 1. No easy way to get the next time value (instead of simply +1 for EPOCH format)
+      // 2. Hard to calculate the bucket boundary (need to consider time zone)
+      // TODO: Support SDF output format
+      if (outputFormat.getTimeFormat() == TimeFormat.SIMPLE_DATE_FORMAT) {
+        return;
+      }
+      long granularityMillis = new DateTimeGranularitySpec(dateTimeConvertOperands.get(3).getLiteral().getStringValue())
+          .granularityToMillis();
+
+      // Step 1: Convert output range to millis range
+      Long lowerMillis = null;
+      Long upperMillis = null;
+      switch (filterKind) {
+        case GREATER_THAN: {
+          // millisToFormat(floor(millis, granularity)) > n
+          // -> millisToFormat(floor(millis, granularity)) >= n + 1
+          // -> floor(millis, granularity) >= formatToMillis(n + 1)
+          // -> millis >= ceil(formatToMillis(n + 1), granularity)
+          //
+          // E.g.
+          // millisToSeconds(floor(millis, 1 minute)) > 0
+          // -> millisToSeconds(floor(millis, 1 minute)) >= 1
+          // -> floor(millis, 1 minute) >= 1000
+          // -> millis >= 60000
+          //
+          // Note that 'millisToSeconds(floor(millis, 1 minute)) > 0' is not equivalent to 'millis > 0'
+          long lowerValue = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          lowerMillis = ceil(outputFormat.fromFormatToMillis(Long.toString(lowerValue + 1)), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          break;
+        }
+        case GREATER_THAN_OR_EQUAL: {
+          // millisToFormat(floor(millis, granularity)) >= n
+          // -> floor(millis, granularity) >= formatToMillis(n)
+          // -> millis >= ceil(formatToMillis(n), granularity)
+          String lowerValue = filterOperands.get(1).getLiteral().getFieldValue().toString();
+          lowerMillis = ceil(outputFormat.fromFormatToMillis(lowerValue), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          break;
+        }
+        case LESS_THAN: {
+          // millisToFormat(floor(millis, granularity)) < n
+          // -> floor(millis, granularity) < formatToMillis(n)
+          // -> millis < ceil(formatToMillis(n), granularity)
+          String upperValue = filterOperands.get(1).getLiteral().getFieldValue().toString();
+          upperMillis = ceil(outputFormat.fromFormatToMillis(upperValue), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        case LESS_THAN_OR_EQUAL: {
+          // millisToFormat(floor(millis, granularity)) <= n
+          // -> millisToFormat(floor(millis, granularity)) < n + 1
+          // -> floor(millis, granularity) < formatToMillis(n + 1)
+          // -> millis < ceil(formatToMillis(n + 1), granularity)
+          //
+          // E.g.
+          // millisToSeconds(floor(millis, 1 minute)) <= 0
+          // -> millisToSeconds(floor(millis, 1 minute)) < 1
+          // -> floor(millis, 1 minute) < 1000
+          // -> millis < 60000
+          //
+          // Note that 'millisToSeconds(floor(millis, 1 minute)) <= 0' is not equivalent to 'millis <= 0'
+          long upperValue = Long.parseLong(filterOperands.get(1).getLiteral().getFieldValue().toString());
+          upperMillis = ceil(outputFormat.fromFormatToMillis(Long.toString(upperValue + 1)), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        case BETWEEN: {
+          // Combine GREATER_THAN_OR_EQUAL and LESS_THAN_OR_EQUAL
+          String lowerValue = filterOperands.get(1).getLiteral().getFieldValue().toString();
+          lowerMillis = ceil(outputFormat.fromFormatToMillis(lowerValue), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          long upperValue = Long.parseLong(filterOperands.get(2).getLiteral().getFieldValue().toString());
+          upperMillis = ceil(outputFormat.fromFormatToMillis(Long.toString(upperValue + 1)), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        case EQUALS: {
+          // Combine GREATER_THAN_OR_EQUAL and LESS_THAN_OR_EQUAL
+          String value = filterOperands.get(1).getLiteral().getFieldValue().toString();
+          lowerMillis = ceil(outputFormat.fromFormatToMillis(value), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(lowerMillis), "Invalid lower bound in millis: %s",
+              lowerMillis);
+          upperMillis =
+              ceil(outputFormat.fromFormatToMillis(Long.toString(Long.parseLong(value) + 1)), granularityMillis);
+          Preconditions.checkState(TimeUtils.timeValueInValidRange(upperMillis), "Invalid upper bound in millis: %s",
+              upperMillis);
+          break;
+        }
+        default:
+          throw new IllegalStateException();
+      }
+
+      // Step 2: Convert millis range to input range
+      String lowerValue = null;
+      boolean lowerInclusive = false;
+      if (lowerMillis != null) {
+        // formatToMillis(col) >= millis
+        // - if (formatToMillis(millisToFormat(millis)) == millis)
+        //   -> col >= millisToFormat(millis)
+        // - else (formatToMillis(millisToFormat(millis)) < millis)
+        //   -> col > millisToFormat(millis)
+        //
+        // E.g.
+        // secondsToMillis(seconds) >= 123
+        // -> seconds > 0
+        // secondsToMillis(seconds) >= 0
+        // -> seconds >= 0
+        lowerValue = inputFormat.fromMillisToFormat(lowerMillis);
+        lowerInclusive = inputFormat.fromFormatToMillis(lowerValue) == lowerMillis;
+      }
+      String upperValue = null;
+      boolean upperInclusive = false;
+      if (upperMillis != null) {
+        // formatToMillis(col) < millis
+        // - if (formatToMillis(millisToFormat(millis)) == millis)
+        //   -> col < millisToFormat(millis)
+        // - else (formatToMillis(millisToFormat(millis)) < millis)
+        //   -> col <= millisToFormat(millis)
+        //
+        // E.g.
+        // secondsToMillis(seconds) < 123
+        // -> seconds <= 0
+        // secondsToMillis(seconds) < 0
+        // -> seconds < 0
+        upperValue = inputFormat.fromMillisToFormat(upperMillis);
+        upperInclusive = inputFormat.fromFormatToMillis(upperValue) != upperMillis;
+      }
+
+      // Step 3: Rewrite the filter function
+      String rangeString = new Range(lowerValue, lowerInclusive, upperValue, upperInclusive).getRangeString();
+      filterFunction.setOperator(FilterKind.RANGE.name());
+      filterFunction
+          .setOperands(Arrays.asList(dateTimeConvertOperands.get(0), RequestUtils.getLiteralExpression(rangeString)));
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while optimizing DATE_TIME_CONVERT predicate: {}, skipping the optimization",
+          filterFunction, e);
+    }
+  }
+
+  private boolean isStringLiteral(Expression expression) {
+    return expression.getType() == ExpressionType.LITERAL && expression.getLiteral().isSetStringValue();
+  }
+
+  /**
+   * Helper method to round up the given value based on the granularity.
+   */
+  private long ceil(long millisValue, long granularityMillis) {
+    return (millisValue + granularityMillis - 1) / granularityMillis * granularityMillis;
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizerTest.java
index e75aa5f..55af556 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/filter/NumericalFilterOptimizerTest.java
@@ -156,8 +156,6 @@ public class NumericalFilterOptimizerTest {
 
     Assert.assertEquals(pinotQuery.getFilterExpression().toString(),
         "Expression(type:LITERAL, literal:<Literal boolValue:true>)");
-
-    System.out.println(Float.MAX_VALUE);
   }
 
   @Test
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/filter/TimePredicateFilterOptimizerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/filter/TimePredicateFilterOptimizerTest.java
new file mode 100644
index 0000000..8ef777b
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/query/optimizer/filter/TimePredicateFilterOptimizerTest.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.optimizer.filter;
+
+import java.util.List;
+import org.apache.pinot.common.request.Expression;
+import org.apache.pinot.common.request.Function;
+import org.apache.pinot.pql.parsers.pql2.ast.FilterKind;
+import org.apache.pinot.sql.parsers.CalciteSqlParser;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class TimePredicateFilterOptimizerTest {
+  private static final TimePredicateFilterOptimizer OPTIMIZER = new TimePredicateFilterOptimizer();
+
+  @Test
+  public void testTimeConvert() {
+    // Same input/output format
+    testNoOpTimeConvert("timeConvert(col, 'MILLISECONDS', 'MILLISECONDS') > 1620830760000");
+    testNoOpTimeConvert("TIME_CONVERT(col, 'MILLISECONDS', 'MILLISECONDS') < 1620917160000");
+    testNoOpTimeConvert("timeconvert(col, 'MILLISECONDS', 'MILLISECONDS') BETWEEN 1620830760000 AND 1620917160000");
+    testNoOpTimeConvert("TIMECONVERT(col, 'MILLISECONDS', 'MILLISECONDS') = 1620830760000");
+
+    // Other output format
+    testTimeConvert("timeConvert(col, 'MILLISECONDS', 'SECONDS') > 1620830760",
+        new Range(1620830761000L, true, null, false));
+    testTimeConvert("timeConvert(col, 'MILLISECONDS', 'MINUTES') < 27015286",
+        new Range(null, false, 1620917160000L, false));
+    testTimeConvert("timeConvert(col, 'MILLISECONDS', 'HOURS') BETWEEN 450230 AND 450254",
+        new Range(1620828000000L, true, 1620918000000L, false));
+    testTimeConvert("timeConvert(col, 'MILLISECONDS', 'DAYS') = 18759",
+        new Range(1620777600000L, true, 1620864000000L, false));
+
+    // Other input format
+    testTimeConvert("timeConvert(col, 'MINUTES', 'SECONDS') > 1620830760", new Range(27013846L, false, null, false));
+    testTimeConvert("timeConvert(col, 'HOURS', 'MINUTES') < 27015286", new Range(null, false, 450254L, true));
+    testTimeConvert("timeConvert(col, 'DAYS', 'HOURS') BETWEEN 450230 AND 450254",
+        new Range(18759L, false, 18760L, true));
+    testTimeConvert("timeConvert(col, 'SECONDS', 'DAYS') = 18759", new Range(1620777600L, true, 1620864000L, false));
+
+    // Invalid time
+    testInvalidTimeConvert("timeConvert(col, 'MINUTES', 'SECONDS') > 1620830760.5");
+    testInvalidTimeConvert("timeConvert(col, 'HOURS', 'MINUTES') > 1620830760");
+  }
+
+  @Test
+  public void testEpochToEpochDateTimeConvert() {
+    // Value not on granularity boundary
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') > 1620830760000",
+        new Range(1620831600000L, true, null, false));
+    testTimeConvert(
+        "DATE_TIME_CONVERT(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') < 1620917160000",
+        new Range(null, false, 1620918000000L, false));
+    testTimeConvert(
+        "datetimeconvert(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') BETWEEN 1620830760000 AND 1620917160000",
+        new Range(1620831600000L, true, 1620918000000L, false));
+    testTimeConvert(
+        "DATETIMECONVERT(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') = 1620830760000",
+        new Range(1620831600000L, true, 1620831600000L, false));
+
+    // Value on granularity boundary
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') > 1620831600000",
+        new Range(1620833400000L, true, null, false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') < 1620918000000",
+        new Range(null, false, 1620918000000L, false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') BETWEEN 1620831600000 AND 1620918000000",
+        new Range(1620831600000L, true, 1620919800000L, false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:MILLISECONDS:EPOCH', '30:MINUTES') = 1620831600000",
+        new Range(1620831600000L, true, 1620833400000L, false));
+
+    // Other output format
+    testTimeConvert("dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:MINUTES:EPOCH', '30:MINUTES') > 27013846",
+        new Range(1620831600000L, true, null, false));
+    testTimeConvert("dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '10:MINUTES:EPOCH', '30:MINUTES') < 2701528",
+        new Range(null, false, 1620918000000L, false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:SECONDS:EPOCH', '30:MINUTES') BETWEEN 1620830760 AND 1620917160",
+        new Range(1620831600000L, true, 1620918000000L, false));
+    testTimeConvert("dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '30:MINUTES:EPOCH', '30:MINUTES') > 900462",
+        new Range(1620833400000L, true, null, false));
+    testTimeConvert("dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:HOURS:EPOCH', '30:MINUTES') < 450255",
+        new Range(null, false, 1620918000000L, false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:DAYS:EPOCH', '30:MINUTES') BETWEEN 18759 AND 18760",
+        new Range(1620777600000L, true, 1620950400000L, false));
+    testTimeConvert("dateTimeConvert(col, '1:MILLISECONDS:EPOCH', '1:DAYS:EPOCH', '30:MINUTES') = 18759",
+        new Range(1620777600000L, true, 1620864000000L, false));
+
+    // Other input format
+    testTimeConvert("dateTimeConvert(col, '1:SECONDS:EPOCH', '1:MINUTES:EPOCH', '30:MINUTES') > 27013846",
+        new Range(1620831600L, true, null, false));
+    testTimeConvert("dateTimeConvert(col, '1:MINUTES:EPOCH', '10:MINUTES:EPOCH', '30:MINUTES') < 2701528",
+        new Range(null, false, 27015300L, false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:DAYS:EPOCH', '1:SECONDS:EPOCH', '30:MINUTES') BETWEEN 1620830760 AND 1620917160",
+        new Range(18759L, false, 18760L, true));
+    testTimeConvert("dateTimeConvert(col, '1:SECONDS:EPOCH', '30:MINUTES:EPOCH', '30:MINUTES') > 900462",
+        new Range(1620833400L, true, null, false));
+    testTimeConvert("dateTimeConvert(col, '1:MINUTES:EPOCH', '1:HOURS:EPOCH', '30:MINUTES') < 450255",
+        new Range(null, false, 27015300L, false));
+    testTimeConvert("dateTimeConvert(col, '1:DAYS:EPOCH', '1:DAYS:EPOCH', '30:MINUTES') BETWEEN 18759 AND 18760",
+        new Range(18759L, true, 18761L, false));
+    testTimeConvert("dateTimeConvert(col, '1:DAYS:EPOCH', '1:DAYS:EPOCH', '30:MINUTES') = 18759",
+        new Range(18759L, true, 18760L, false));
+
+    // Invalid time
+    testInvalidTimeConvert("dateTimeConvert(col, '1:SECONDS:EPOCH', '1:MINUTES:EPOCH', '30:MINUTES') > 27013846.5");
+    testInvalidTimeConvert("dateTimeConvert(col, '1:SECONDS:EPOCH', '30:MINUTES:EPOCH', '30:MINUTES') > 27013846");
+  }
+
+  @Test
+  public void testSDFToEpochDateTimeConvert() {
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:MILLISECONDS:EPOCH', '30:MINUTES') > 1620830760000",
+        new Range("2021-05-12 15:00:00.000", true, null, false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', '1:MILLISECONDS:EPOCH', '30:MINUTES') < 1620917160000",
+        new Range(null, false, "2021-05-13 15:00:00", false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:MINUTES:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm', '1:MILLISECONDS:EPOCH', '30:MINUTES') BETWEEN 1620830760000 AND 1620917160000",
+        new Range("2021-05-12 15:00", true, "2021-05-13 15:00", false));
+    testTimeConvert(
+        "dateTimeConvert(col, '1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd', '1:MILLISECONDS:EPOCH', '30:MINUTES') = 1620830760000",
+        new Range("2021-05-12", false, "2021-05-12", true));
+
+    // Invalid time
+    testInvalidTimeConvert(
+        "dateTimeConvert(col, '1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss.SSS', '1:MILLISECONDS:EPOCH', '30:MINUTES') > 1620830760000.5");
+    testInvalidTimeConvert(
+        "dateTimeConvert(col, '1:SECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd HH:mm:ss', '1:MILLISECONDS:EPOCH', '30:MINUTES') < 1620917160");
+  }
+
+  /**
+   * Helper method to test no-op TIME_CONVERT filter (same input and output time unit).
+   */
+  private void testNoOpTimeConvert(String filterString) {
+    Expression originalExpression = CalciteSqlParser.compileToExpression(filterString);
+    Function originalFunction = originalExpression.getFunctionCall();
+    List<Expression> originalOperands = originalFunction.getOperands();
+    Expression optimizedFilterExpression = OPTIMIZER.optimize(CalciteSqlParser.compileToExpression(filterString));
+    Function optimizedFunction = optimizedFilterExpression.getFunctionCall();
+    List<Expression> optimizedOperands = optimizedFunction.getOperands();
+    assertEquals(optimizedFunction.getOperator(), originalFunction.getOperator());
+    assertEquals(optimizedOperands.size(), originalOperands.size());
+    // TIME_CONVERT transform should be removed
+    assertEquals(optimizedOperands.get(0), originalOperands.get(0).getFunctionCall().getOperands().get(0));
+    int numOperands = optimizedOperands.size();
+    for (int i = 1; i < numOperands; i++) {
+      assertEquals(optimizedOperands.get(i), originalOperands.get(i));
+    }
+  }
+
+  /**
+   * Helper method to test optimizing TIME_CONVERT/DATE_TIME_CONVERT on the given filter.
+   */
+  private void testTimeConvert(String filterString, Range expectedRange) {
+    Expression originalExpression = CalciteSqlParser.compileToExpression(filterString);
+    Expression optimizedFilterExpression = OPTIMIZER.optimize(CalciteSqlParser.compileToExpression(filterString));
+    Function function = optimizedFilterExpression.getFunctionCall();
+    assertEquals(function.getOperator(), FilterKind.RANGE.name());
+    List<Expression> operands = function.getOperands();
+    assertEquals(operands.size(), 2);
+    assertEquals(operands.get(0),
+        originalExpression.getFunctionCall().getOperands().get(0).getFunctionCall().getOperands().get(0));
+    String rangeString = operands.get(1).getLiteral().getStringValue();
+    assertEquals(rangeString, expectedRange.getRangeString());
+  }
+
+  /**
+   * Helper method to test optimizing TIME_CONVERT/DATE_TIME_CONVERT with invalid time in filter.
+   */
+  private void testInvalidTimeConvert(String filterString) {
+    Expression originalExpression = CalciteSqlParser.compileToExpression(filterString);
+    Expression optimizedFilterExpression = OPTIMIZER.optimize(CalciteSqlParser.compileToExpression(filterString));
+    assertEquals(optimizedFilterExpression, originalExpression);
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index c674f8b..4747b77 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -1203,16 +1203,13 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
     }
     {
       //test single alias
-      String query =
-          "SELECT ArrTime, Carrier AS CarrierName, DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
+      String query = "SELECT ArrTime, Carrier AS CarrierName, DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
       testSqlQuery(query, Collections.singletonList(query));
 
-      query =
-          "SELECT count(*) AS cnt, max(ArrTime) as maxArrTime FROM mytable";
+      query = "SELECT count(*) AS cnt, max(ArrTime) as maxArrTime FROM mytable";
       testSqlQuery(query, Collections.singletonList(query));
 
-      query =
-          "SELECT count(*) AS cnt, Carrier AS CarrierName FROM mytable GROUP BY CarrierName ORDER BY cnt";
+      query = "SELECT count(*) AS cnt, Carrier AS CarrierName FROM mytable GROUP BY CarrierName ORDER BY cnt";
       testSqlQuery(query, Collections.singletonList(query));
     }
     {
@@ -1221,8 +1218,7 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
           "SELECT ArrTime, Carrier, Carrier AS CarrierName1, Carrier AS CarrierName2, DaysSinceEpoch FROM mytable ORDER BY DaysSinceEpoch DESC";
       testSqlQuery(query, Collections.singletonList(query));
 
-      query =
-          "SELECT count(*) AS cnt, max(ArrTime) as maxArrTime1, max(ArrTime) as maxArrTime2 FROM mytable";
+      query = "SELECT count(*) AS cnt, max(ArrTime) as maxArrTime1, max(ArrTime) as maxArrTime2 FROM mytable";
       testSqlQuery(query, Collections.singletonList(query));
 
       query =
@@ -1396,14 +1392,15 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   @Test
   public void testCaseInsensitivity() {
     int daysSinceEpoch = 16138;
-    long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    int hoursSinceEpoch = 16138 * 24;
+    int secondsSinceEpoch = 16138 * 24 * 60 * 60;
     List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
         "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
         "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
-        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
         "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
         "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
     List<String> queries = new ArrayList<>();
@@ -1428,14 +1425,15 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   @Test
   public void testColumnNameContainsTableName() {
     int daysSinceEpoch = 16138;
-    long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    int hoursSinceEpoch = 16138 * 24;
+    int secondsSinceEpoch = 16138 * 24 * 60 * 60;
     List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
         "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
         "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
-        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
         "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
         "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
     List<String> queries = new ArrayList<>();
@@ -1459,14 +1457,15 @@ public class OfflineClusterIntegrationTest extends BaseClusterIntegrationTestSet
   @Test
   public void testCaseInsensitivityWithColumnNameContainsTableName() {
     int daysSinceEpoch = 16138;
-    long secondsSinceEpoch = 16138 * 24 * 60 * 60;
+    int hoursSinceEpoch = 16138 * 24;
+    int secondsSinceEpoch = 16138 * 24 * 60 * 60;
     List<String> baseQueries = Arrays.asList("SELECT * FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by DaysSinceEpoch limit 10000",
         "SELECT DaysSinceEpoch, timeConvert(DaysSinceEpoch,'DAYS','SECONDS') FROM mytable order by timeConvert(DaysSinceEpoch,'DAYS','SECONDS') DESC limit 10000",
         "SELECT count(*) FROM mytable WHERE DaysSinceEpoch = " + daysSinceEpoch,
+        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','HOURS') = " + hoursSinceEpoch,
         "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + secondsSinceEpoch,
-        "SELECT count(*) FROM mytable WHERE timeConvert(DaysSinceEpoch,'DAYS','SECONDS') = " + daysSinceEpoch,
         "SELECT MAX(timeConvert(DaysSinceEpoch,'DAYS','SECONDS')) FROM mytable",
         "SELECT COUNT(*) FROM mytable GROUP BY dateTimeConvert(DaysSinceEpoch,'1:DAYS:EPOCH','1:HOURS:EPOCH','1:HOURS')");
     List<String> queries = new ArrayList<>();
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeGranularitySpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeGranularitySpec.java
index e195f8e..a4077d3 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeGranularitySpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeGranularitySpec.java
@@ -88,7 +88,7 @@ public class DateTimeGranularitySpec {
    * </ul>
    * </ul>
    */
-  public Long granularityToMillis() {
+  public long granularityToMillis() {
     return TimeUnit.MILLISECONDS.convert(_size, _timeUnit);
   }
 

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org