You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2020/05/05 00:38:47 UTC

[incubator-pinot] branch master updated: Timespec to datetimespec conversion utility (#5324)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bda544  Timespec to datetimespec conversion utility (#5324)
2bda544 is described below

commit 2bda544f967e405a6aac20d2bb2abce430da9603
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Mon May 4 17:38:35 2020 -0700

    Timespec to datetimespec conversion utility (#5324)
    
    This is an item of project: #2756
    We plan to internally start treating timeFieldSpec as dateTimeFieldSpec. This PR adds a utility function which converts a timeFieldSpec to an equivalent dateTimeFieldSpec.
    Note that dateTimeFieldSpec doesn't have the concept of incoming/outgoing, and hence we construct and add a transform function to convey the conversion. Introduced a lot of epoch time transform functions for this.
    
    NOTE: the toEpochXXXBucket method is added to help with conversion from timeFieldSpec to dateTimeFieldSpec. Practically, we would only be using toEpochXXX and toEpochXXXRounded.
---
 .../core/data/function/DateTimeFunctions.java      | 142 +++++++++++++++-
 .../pinot/core/data/function/FunctionRegistry.java |  30 +++-
 .../function/DateTimeFunctionEvaluatorTest.java    | 125 +++++++++++++-
 .../apache/pinot/core/util/SchemaUtilsTest.java    |   2 +-
 .../java/org/apache/pinot/spi/data/Schema.java     | 127 ++++++++++++++
 .../pinot/spi/data/DateTimeFieldSpecUtilsTest.java | 185 +++++++++++++++++++++
 6 files changed, 598 insertions(+), 13 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/function/DateTimeFunctions.java b/pinot-core/src/main/java/org/apache/pinot/core/data/function/DateTimeFunctions.java
index 533a497..b312f86 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/function/DateTimeFunctions.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/function/DateTimeFunctions.java
@@ -24,10 +24,28 @@ import java.util.concurrent.TimeUnit;
 /**
  * Inbuilt date time related transform functions
  * TODO: Exhaustively add all time conversion functions
+ *  eg:
+ *   1) round(time, roundingValue) - round(minutes, 10), round(millis, 15:MINUTES)
+ *   2) simple date time transformations
+ *   3) convert(from_format, to_format, bucketing)  
  */
 public class DateTimeFunctions {
 
   /**
+   * Convert epoch millis to epoch seconds
+   */
+  static Long toEpochSeconds(Long millis) {
+    return TimeUnit.MILLISECONDS.toSeconds(millis);
+  }
+
+  /**
+   * Convert epoch millis to epoch minutes
+   */
+  static Long toEpochMinutes(Long millis) {
+    return TimeUnit.MILLISECONDS.toMinutes(millis);
+  }
+
+  /**
    * Convert epoch millis to epoch hours
    */
   static Long toEpochHours(Long millis) {
@@ -35,9 +53,127 @@ public class DateTimeFunctions {
   }
 
   /**
-   * Convert epoch millis to epoch minutes, bucketed by given bucket granularity
+   * Convert epoch millis to epoch days
+   */
+  static Long toEpochDays(Long millis) {
+    return TimeUnit.MILLISECONDS.toDays(millis);
+  }
+
+  /**
+   * Convert epoch millis to epoch seconds, round to nearest rounding bucket
+   */
+  static Long toEpochSecondsRounded(Long millis, String roundingValue) {
+    int roundToNearest = Integer.parseInt(roundingValue);
+    return (TimeUnit.MILLISECONDS.toSeconds(millis) / roundToNearest) * roundToNearest;
+  }
+
+  /**
+   * Convert epoch millis to epoch minutes, round to nearest rounding bucket
+   */
+  static Long toEpochMinutesRounded(Long millis, String roundingValue) {
+    int roundToNearest = Integer.parseInt(roundingValue);
+    return (TimeUnit.MILLISECONDS.toMinutes(millis) / roundToNearest) * roundToNearest;
+  }
+
+  /**
+   * Convert epoch millis to epoch hours, round to nearest rounding bucket
+   */
+  static Long toEpochHoursRounded(Long millis, String roundingValue) {
+    int roundToNearest = Integer.parseInt(roundingValue);
+    return (TimeUnit.MILLISECONDS.toHours(millis) / roundToNearest) * roundToNearest;
+  }
+
+  /**
+   * Convert epoch millis to epoch days, round to nearest rounding bucket
+   */
+  static Long toEpochDaysRounded(Long millis, String roundingValue) {
+    int roundToNearest = Integer.parseInt(roundingValue);
+    return (TimeUnit.MILLISECONDS.toDays(millis) / roundToNearest) * roundToNearest;
+  }
+
+  // TODO: toEpochXXXBucket methods are only needed to convert from TimeFieldSpec to DateTimeFieldSpec.
+  //  Practically, we need the toEpochXXXRounded methods.
+  /**
+   * Convert epoch millis to epoch seconds, divided by given bucket, to get nSecondsSinceEpoch
+   */
+  static Long toEpochSecondsBucket(Long millis, String bucket) {
+    return TimeUnit.MILLISECONDS.toSeconds(millis) / Integer.valueOf(bucket);
+  }
+
+  /**
+   * Convert epoch millis to epoch minutes, divided by given bucket, to get nMinutesSinceEpoch
+   */
+  static Long toEpochMinutesBucket(Long millis, String bucket) {
+    return TimeUnit.MILLISECONDS.toMinutes(millis) / Integer.valueOf(bucket);
+  }
+
+  /**
+   * Convert epoch millis to epoch hours, divided by given bucket, to get nHoursSinceEpoch
+   */
+  static Long toEpochHoursBucket(Long millis, String bucket) {
+    return TimeUnit.MILLISECONDS.toHours(millis) / Integer.valueOf(bucket);
+  }
+
+  /**
+   * Convert epoch millis to epoch days, divided by given bucket, to get nDaysSinceEpoch
+   */
+  static Long toEpochDaysBucket(Long millis, String bucket) {
+    return TimeUnit.MILLISECONDS.toDays(millis) / Integer.valueOf(bucket);
+  }
+
+  /**
+   * Converts epoch seconds to epoch millis
+   */
+  static Long fromEpochSeconds(Long seconds) {
+    return TimeUnit.SECONDS.toMillis(seconds);
+  }
+
+  /**
+   * Converts epoch minutes to epoch millis
+   */
+  static Long fromEpochMinutes(Number minutes) {
+    return TimeUnit.MINUTES.toMillis(minutes.longValue());
+  }
+
+  /**
+   * Converts epoch hours to epoch millis
+   */
+  static Long fromEpochHours(Number hours) {
+    return TimeUnit.HOURS.toMillis(hours.longValue());
+  }
+
+  /**
+   * Converts epoch days to epoch millis
+   */
+  static Long fromEpochDays(Number daysSinceEpoch) {
+    return TimeUnit.DAYS.toMillis(daysSinceEpoch.longValue());
+  }
+
+  /**
+   * Converts nSecondsSinceEpoch (seconds that have been divided by a bucket), to epoch millis
+   */
+  static Long fromEpochSecondsBucket(Long seconds, String bucket) {
+    return TimeUnit.SECONDS.toMillis(seconds * Integer.valueOf(bucket));
+  }
+
+  /**
+   * Converts nMinutesSinceEpoch (minutes that have been divided by a bucket), to epoch millis
+   */
+  static Long fromEpochMinutesBucket(Number minutes, String bucket) {
+    return TimeUnit.MINUTES.toMillis(minutes.longValue() * Integer.valueOf(bucket));
+  }
+
+  /**
+   * Converts nHoursSinceEpoch (hours that have been divided by a bucket), to epoch millis
+   */
+  static Long fromEpochHoursBucket(Number hours, String bucket) {
+    return TimeUnit.HOURS.toMillis(hours.longValue() * Integer.valueOf(bucket));
+  }
+
+  /**
+   * Converts nDaysSinceEpoch (days that have been divided by a bucket), to epoch millis
    */
-  static Long toEpochMinutes(Long millis, String bucket) {
-    return TimeUnit.MILLISECONDS.toMinutes(millis) / Integer.parseInt(bucket);
+  static Long fromEpochDaysBucket(Number daysSinceEpoch, String bucket) {
+    return TimeUnit.DAYS.toMillis(daysSinceEpoch.longValue() * Integer.valueOf(bucket));
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionRegistry.java b/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionRegistry.java
index 8a8dfac..c1abb5d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionRegistry.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionRegistry.java
@@ -40,8 +40,36 @@ public class FunctionRegistry {
 
   static {
     try {
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochSeconds", Long.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochMinutes", Long.class));
       registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochHours", Long.class));
-      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochMinutes", Long.class, String.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochDays", Long.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("toEpochSecondsRounded", Long.class, String.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("toEpochMinutesRounded", Long.class, String.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("toEpochHoursRounded", Long.class, String.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochDaysRounded", Long.class, String.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("toEpochSecondsBucket", Long.class, String.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("toEpochMinutesBucket", Long.class, String.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochHoursBucket", Long.class, String.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("toEpochDaysBucket", Long.class, String.class));
+
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("fromEpochSeconds", Long.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("fromEpochMinutes", Number.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("fromEpochHours", Number.class));
+      registerStaticFunction(DateTimeFunctions.class.getDeclaredMethod("fromEpochDays", Number.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("fromEpochSecondsBucket", Long.class, String.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("fromEpochMinutesBucket", Number.class, String.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("fromEpochHoursBucket", Number.class, String.class));
+      registerStaticFunction(
+          DateTimeFunctions.class.getDeclaredMethod("fromEpochDaysBucket", Number.class, String.class));
     } catch (NoSuchMethodException e) {
       LOGGER.error("Caught exception when registering function", e);
     }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
index d6b1a04..067808f 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/function/DateTimeFunctionEvaluatorTest.java
@@ -44,15 +44,124 @@ public class DateTimeFunctionEvaluatorTest {
   public Object[][] dateTimeFunctionsDataProvider() {
     List<Object[]> inputs = new ArrayList<>();
 
+    // toEpochSeconds
+    GenericRow row1_0 = new GenericRow();
+    row1_0.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochSeconds(timestamp)", Lists.newArrayList("timestamp"), row1_0, 1578685189L});
+
+    // toEpochSeconds w/ rounding
+    GenericRow row1_1 = new GenericRow();
+    row1_1.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochSecondsRounded(timestamp, 10)", Lists.newArrayList("timestamp"), row1_1, 1578685180L});
+
+    // toEpochSeconds w/ bucketing
+    GenericRow row1_2 = new GenericRow();
+    row1_2.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochSecondsBucket(timestamp, 10)", Lists.newArrayList("timestamp"), row1_2, 157868518L});
+
+    // toEpochMinutes
+    GenericRow row2_0 = new GenericRow();
+    row2_0.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochMinutes(timestamp)", Lists.newArrayList("timestamp"), row2_0, 26311419L});
+
+    // toEpochMinutes w/ rounding
+    GenericRow row2_1 = new GenericRow();
+    row2_1.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochMinutesRounded(timestamp, 15)", Lists.newArrayList("timestamp"), row2_1, 26311410L});
+
+    // toEpochMinutes w/ bucketing
+    GenericRow row2_2 = new GenericRow();
+    row2_2.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochMinutesBucket(timestamp, 15)", Lists.newArrayList("timestamp"), row2_2, 1754094L});
+
     // toEpochHours
-    GenericRow row1 = new GenericRow();
-    row1.putValue("timestamp", 1585724400000L);
-    inputs.add(new Object[]{"toEpochHours(timestamp)", Lists.newArrayList("timestamp"), row1, 440479L});
-
-    // toEpochMinutes w/ bucketing fixed
-    GenericRow row2 = new GenericRow();
-    row2.putValue("millis", 1585724400000L);
-    inputs.add(new Object[]{"toEpochMinutes(millis, 5)", Lists.newArrayList("millis"), row2, 5285748L});
+    GenericRow row3_0 = new GenericRow();
+    row3_0.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochHours(timestamp)", Lists.newArrayList("timestamp"), row3_0, 438523L});
+
+    // toEpochHours w/ rounding
+    GenericRow row3_1 = new GenericRow();
+    row3_1.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochHoursRounded(timestamp, 2)", Lists.newArrayList("timestamp"), row3_1, 438522L});
+
+    // toEpochHours w/ bucketing
+    GenericRow row3_2 = new GenericRow();
+    row3_2.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochHoursBucket(timestamp, 2)", Lists.newArrayList("timestamp"), row3_2, 219261L});
+
+    // toEpochDays
+    GenericRow row4_0 = new GenericRow();
+    row4_0.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochDays(timestamp)", Lists.newArrayList("timestamp"), row4_0, 18271L});
+
+    // toEpochDays w/ rounding
+    GenericRow row4_1 = new GenericRow();
+    row4_1.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochDaysRounded(timestamp, 7)", Lists.newArrayList("timestamp"), row4_1, 18270L});
+
+    // toEpochDays w/ bucketing
+    GenericRow row4_2 = new GenericRow();
+    row4_2.putValue("timestamp", 1578685189000L);
+    inputs.add(new Object[]{"toEpochDaysBucket(timestamp, 7)", Lists.newArrayList("timestamp"), row4_2, 2610L});
+
+    // fromEpochDays
+    GenericRow row5_0 = new GenericRow();
+    row5_0.putValue("daysSinceEpoch", 14000);
+    inputs
+        .add(new Object[]{"fromEpochDays(daysSinceEpoch)", Lists.newArrayList("daysSinceEpoch"), row5_0, 1209600000000L});
+
+    // fromEpochDays w/ bucketing
+    GenericRow row5_1 = new GenericRow();
+    row5_1.putValue("sevenDaysSinceEpoch", 2000);
+    inputs.add(new Object[]{"fromEpochDaysBucket(sevenDaysSinceEpoch, 7)", Lists.newArrayList(
+        "sevenDaysSinceEpoch"), row5_1, 1209600000000L});
+
+    // fromEpochHours
+    GenericRow row6_0 = new GenericRow();
+    row6_0.putValue("hoursSinceEpoch", 336000);
+    inputs
+        .add(new Object[]{"fromEpochHours(hoursSinceEpoch)", Lists.newArrayList("hoursSinceEpoch"), row6_0, 1209600000000L});
+
+    // fromEpochHours w/ bucketing
+    GenericRow row6_1 = new GenericRow();
+    row6_1.putValue("twoHoursSinceEpoch", 168000);
+    inputs.add(new Object[]{"fromEpochHoursBucket(twoHoursSinceEpoch, 2)", Lists.newArrayList(
+        "twoHoursSinceEpoch"), row6_1, 1209600000000L});
+
+    // fromEpochMinutes
+    GenericRow row7_0 = new GenericRow();
+    row7_0.putValue("minutesSinceEpoch", 20160000);
+    inputs
+        .add(new Object[]{"fromEpochMinutes(minutesSinceEpoch)", Lists.newArrayList("minutesSinceEpoch"), row7_0, 1209600000000L});
+
+    // fromEpochMinutes w/ bucketing
+    GenericRow row7_1 = new GenericRow();
+    row7_1.putValue("fifteenMinutesSinceEpoch", 1344000);
+    inputs.add(new Object[]{"fromEpochMinutesBucket(fifteenMinutesSinceEpoch, 15)", Lists.newArrayList(
+        "fifteenMinutesSinceEpoch"), row7_1, 1209600000000L});
+
+    // fromEpochSeconds
+    GenericRow row8_0 = new GenericRow();
+    row8_0.putValue("secondsSinceEpoch", 1209600000L);
+    inputs
+        .add(new Object[]{"fromEpochSeconds(secondsSinceEpoch)", Lists.newArrayList("secondsSinceEpoch"), row8_0, 1209600000000L});
+
+    // fromEpochSeconds w/ bucketing
+    GenericRow row8_1 = new GenericRow();
+    row8_1.putValue("tenSecondsSinceEpoch", 120960000L);
+    inputs.add(new Object[]{"fromEpochSecondsBucket(tenSecondsSinceEpoch, 10)", Lists.newArrayList(
+        "tenSecondsSinceEpoch"), row8_1, 1209600000000L});
+
+    // nested
+    GenericRow row9_0 = new GenericRow();
+    row9_0.putValue("hoursSinceEpoch", 336000);
+    inputs.add(new Object[]{"toEpochDays(fromEpochHours(hoursSinceEpoch))", Lists.newArrayList(
+        "hoursSinceEpoch"), row9_0, 14000L});
+
+    GenericRow row9_1 = new GenericRow();
+    row9_1.putValue("fifteenSecondsSinceEpoch", 80640000L);
+    inputs.add(new Object[]{"toEpochMinutesBucket(fromEpochSecondsBucket(fifteenSecondsSinceEpoch, 15), 10)", Lists.newArrayList(
+        "fifteenSecondsSinceEpoch"), row9_1, 2016000L});
 
     return inputs.toArray(new Object[0][]);
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index cb7e6b9..f4b4e75 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -108,7 +108,7 @@ public class SchemaUtilsTest {
     // inbuilt functions with literal
     schema = new Schema();
     dimensionFieldSpec = new DimensionFieldSpec("tenMinutesSinceEpoch", FieldSpec.DataType.LONG, true);
-    dimensionFieldSpec.setTransformFunction("toEpochMinutes(timestamp, 10)");
+    dimensionFieldSpec.setTransformFunction("toEpochMinutesBucket(timestamp, 10)");
     schema.addField(dimensionFieldSpec);
     extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index bc16ea7..970838c 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.io.File;
 import java.io.IOException;
@@ -631,4 +632,130 @@ public final class Schema {
     result = EqualityUtils.hashCodeOf(result, _dateTimeFieldSpecs);
     return result;
   }
+
+  /**
+   * Helper method that converts a {@link TimeFieldSpec} to {@link DateTimeFieldSpec}
+   * 1) If timeFieldSpec contains only incoming granularity spec, directly convert it to a dateTimeFieldSpec
+   * 2) If timeFieldSpec contains incoming aas well as outgoing granularity spec, use the outgoing spec to construct the dateTimeFieldSpec,
+   *    and configure a transform function for the conversion from incoming
+   */
+  @VisibleForTesting
+  static DateTimeFieldSpec convertToDateTimeFieldSpec(TimeFieldSpec timeFieldSpec) {
+    DateTimeFieldSpec dateTimeFieldSpec = new DateTimeFieldSpec();
+    TimeGranularitySpec incomingGranularitySpec = timeFieldSpec.getIncomingGranularitySpec();
+    TimeGranularitySpec outgoingGranularitySpec = timeFieldSpec.getOutgoingGranularitySpec();
+
+    dateTimeFieldSpec.setName(outgoingGranularitySpec.getName());
+    dateTimeFieldSpec.setDataType(outgoingGranularitySpec.getDataType());
+
+    int outgoingTimeSize = outgoingGranularitySpec.getTimeUnitSize();
+    TimeUnit outgoingTimeUnit = outgoingGranularitySpec.getTimeType();
+    String outgoingTimeFormat = outgoingGranularitySpec.getTimeFormat();
+    String[] split = outgoingTimeFormat.split(DateTimeFormatSpec.COLON_SEPARATOR);
+    DateTimeFormatSpec formatSpec;
+    if (split[0].equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) {
+      formatSpec = new DateTimeFormatSpec(outgoingTimeSize, outgoingTimeUnit.toString(), split[0]);
+    } else {
+      formatSpec = new DateTimeFormatSpec(outgoingTimeSize, outgoingTimeUnit.toString(), split[0], split[1]);
+    }
+    dateTimeFieldSpec.setFormat(formatSpec.getFormat());
+    DateTimeGranularitySpec granularitySpec = new DateTimeGranularitySpec(outgoingTimeSize, outgoingTimeUnit);
+    dateTimeFieldSpec.setGranularity(granularitySpec.getGranularity());
+
+    if (timeFieldSpec.getTransformFunction() != null) {
+      dateTimeFieldSpec.setTransformFunction(timeFieldSpec.getTransformFunction());
+    } else if (!incomingGranularitySpec.equals(outgoingGranularitySpec)) {
+      String incomingName = incomingGranularitySpec.getName();
+      int incomingTimeSize = incomingGranularitySpec.getTimeUnitSize();
+      TimeUnit incomingTimeUnit = incomingGranularitySpec.getTimeType();
+      String incomingTimeFormat = incomingGranularitySpec.getTimeFormat();
+      Preconditions.checkState(incomingTimeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString()) && outgoingTimeFormat
+              .equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString()),
+          "Conversion from incoming to outgoing is not supported for SIMPLE_DATE_FORMAT");
+      String transformFunction =
+          getTransformFunction(incomingName, incomingTimeSize, incomingTimeUnit, outgoingTimeSize, outgoingTimeUnit);
+      dateTimeFieldSpec.setTransformFunction(transformFunction);
+    }
+
+    dateTimeFieldSpec.setMaxLength(timeFieldSpec.getMaxLength());
+    dateTimeFieldSpec.setDefaultNullValue(timeFieldSpec.getDefaultNullValue());
+
+    return dateTimeFieldSpec;
+  }
+
+  private static String getTransformFunction(String incomingName, int incomingTimeSize, TimeUnit incomingTimeUnit,
+      int outgoingTimeSize, TimeUnit outgoingTimeUnit) {
+
+    String innerFunction = incomingName;
+    switch (incomingTimeUnit) {
+
+      case MILLISECONDS:
+        // do nothing
+        break;
+      case SECONDS:
+        if (incomingTimeSize > 1) {
+          innerFunction = String.format("fromEpochSecondsBucket(%s, %d)", incomingName, incomingTimeSize);
+        } else {
+          innerFunction = String.format("fromEpochSeconds(%s)", incomingName);
+        }
+        break;
+      case MINUTES:
+        if (incomingTimeSize > 1) {
+          innerFunction = String.format("fromEpochMinutesBucket(%s, %d)", incomingName, incomingTimeSize);
+        } else {
+          innerFunction = String.format("fromEpochMinutes(%s)", incomingName);
+        }
+        break;
+      case HOURS:
+        if (incomingTimeSize > 1) {
+          innerFunction = String.format("fromEpochHoursBucket(%s, %d)", incomingName, incomingTimeSize);
+        } else {
+          innerFunction = String.format("fromEpochHours(%s)", incomingName);
+        }
+        break;
+      case DAYS:
+        if (incomingTimeSize > 1) {
+          innerFunction = String.format("fromEpochDaysBucket(%s, %d)", incomingName, incomingTimeSize);
+        } else {
+          innerFunction = String.format("fromEpochDays(%s)", incomingName);
+        }
+        break;
+    }
+
+    String outerFunction = null;
+    switch (outgoingTimeUnit) {
+
+      case MILLISECONDS:
+        break;
+      case SECONDS:
+        if (outgoingTimeSize > 1) {
+          outerFunction = String.format("toEpochSecondsBucket(%s, %d)", innerFunction, outgoingTimeSize);
+        } else {
+          outerFunction = String.format("toEpochSeconds(%s)", innerFunction);
+        }
+        break;
+      case MINUTES:
+        if (outgoingTimeSize > 1) {
+          outerFunction = String.format("toEpochMinutesBucket(%s, %d)", innerFunction, outgoingTimeSize);
+        } else {
+          outerFunction = String.format("toEpochMinutes(%s)", innerFunction);
+        }
+        break;
+      case HOURS:
+        if (outgoingTimeSize > 1) {
+          outerFunction = String.format("toEpochHoursBucket(%s, %d)", innerFunction, outgoingTimeSize);
+        } else {
+          outerFunction = String.format("toEpochHours(%s)", innerFunction);
+        }
+        break;
+      case DAYS:
+        if (outgoingTimeSize > 1) {
+          outerFunction = String.format("toEpochDaysBucket(%s, %d)", innerFunction, outgoingTimeSize);
+        } else {
+          outerFunction = String.format("toEpochDays(%s)", innerFunction);
+        }
+        break;
+    }
+    return outerFunction;
+  }
 }
diff --git a/pinot-spi/src/test/java/org/apache/pinot/spi/data/DateTimeFieldSpecUtilsTest.java b/pinot-spi/src/test/java/org/apache/pinot/spi/data/DateTimeFieldSpecUtilsTest.java
new file mode 100644
index 0000000..8bd3cbe
--- /dev/null
+++ b/pinot-spi/src/test/java/org/apache/pinot/spi/data/DateTimeFieldSpecUtilsTest.java
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.data;
+
+import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.data.FieldSpec.DataType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+/**
+ * Tests the conversion of a {@link TimeFieldSpec} to an equivalent {@link DateTimeFieldSpec}
+ */
+public class DateTimeFieldSpecUtilsTest {
+
+  @Test
+  public void testConversionFromTimeToDateTimeSpec() {
+    TimeFieldSpec timeFieldSpec;
+    DateTimeFieldSpec expectedDateTimeFieldSpec;
+    DateTimeFieldSpec actualDateTimeFieldSpec;
+
+    /* 1] only incoming */
+
+    // incoming epoch millis
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "incoming"));
+    expectedDateTimeFieldSpec =
+        new DateTimeFieldSpec("incoming", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // incoming epoch hours
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.INT, TimeUnit.HOURS, "incoming"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("incoming", DataType.INT, "1:HOURS:EPOCH", "1:HOURS");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // Simple date format
+    timeFieldSpec = new TimeFieldSpec(
+        new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "SIMPLE_DATE_FORMAT:yyyyMMdd", "incoming"));
+    expectedDateTimeFieldSpec =
+        new DateTimeFieldSpec("incoming", DataType.INT, "1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "1:DAYS");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // simple date format STRING
+    timeFieldSpec = new TimeFieldSpec(
+        new TimeGranularitySpec(DataType.STRING, TimeUnit.DAYS, "SIMPLE_DATE_FORMAT:yyyy-MM-dd hh-mm-ss", "incoming"));
+    expectedDateTimeFieldSpec =
+        new DateTimeFieldSpec("incoming", DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd hh-mm-ss", "1:DAYS");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // time unit size
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 5, TimeUnit.MINUTES, "incoming"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("incoming", DataType.LONG, "5:MINUTES:EPOCH", "5:MINUTES");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // transform function
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.INT, TimeUnit.HOURS, "incoming"));
+    timeFieldSpec.setTransformFunction("toEpochHours(timestamp)");
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("incoming", DataType.INT, "1:HOURS:EPOCH", "1:HOURS");
+    expectedDateTimeFieldSpec.setTransformFunction("toEpochHours(timestamp)");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    /* 2] incoming + outgoing */
+
+    // same incoming and outgoing
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "time"),
+        new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "time"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("time", DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // same incoming and outgoing - simple date format
+    timeFieldSpec =
+        new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.DAYS, "SIMPLE_DATE_FORMAT:yyyyMMdd", "time"),
+            new TimeGranularitySpec(DataType.LONG, TimeUnit.DAYS, "SIMPLE_DATE_FORMAT:yyyyMMdd", "time"));
+    expectedDateTimeFieldSpec =
+        new DateTimeFieldSpec("time", DataType.LONG, "1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "1:DAYS");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // millis to hours
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "incoming"),
+        new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
+    expectedDateTimeFieldSpec.setTransformFunction("toEpochHours(incoming)");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // millis to bucketed minutes
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "incoming"),
+        new TimeGranularitySpec(DataType.LONG, 10, TimeUnit.MINUTES, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.LONG, "10:MINUTES:EPOCH", "10:MINUTES");
+    expectedDateTimeFieldSpec.setTransformFunction("toEpochMinutesBucket(incoming, 10)");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // days to millis
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "incoming"),
+        new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS");
+    expectedDateTimeFieldSpec.setTransformFunction("fromEpochDays(incoming)");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // bucketed minutes to millis
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 5, TimeUnit.MINUTES, "incoming"),
+        new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS");
+    expectedDateTimeFieldSpec.setTransformFunction("fromEpochMinutesBucket(incoming, 5)");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // hours to days
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.INT, TimeUnit.HOURS, "incoming"),
+        new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.INT, "1:DAYS:EPOCH", "1:DAYS");
+    expectedDateTimeFieldSpec.setTransformFunction("toEpochDays(fromEpochHours(incoming))");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // minutes to hours
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.MINUTES, "incoming"),
+        new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
+    expectedDateTimeFieldSpec.setTransformFunction("toEpochHours(fromEpochMinutes(incoming))");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // bucketed minutes to days
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, 10, TimeUnit.MINUTES, "incoming"),
+        new TimeGranularitySpec(DataType.LONG, TimeUnit.DAYS, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.LONG, "1:DAYS:EPOCH", "1:DAYS");
+    expectedDateTimeFieldSpec.setTransformFunction("toEpochDays(fromEpochMinutesBucket(incoming, 10))");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // seconds to bucketed minutes
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.SECONDS, "incoming"),
+        new TimeGranularitySpec(DataType.LONG, 5, TimeUnit.MINUTES, "outgoing"));
+    expectedDateTimeFieldSpec = new DateTimeFieldSpec("outgoing", DataType.LONG, "5:MINUTES:EPOCH", "5:MINUTES");
+    expectedDateTimeFieldSpec.setTransformFunction("toEpochMinutesBucket(fromEpochSeconds(incoming), 5)");
+    actualDateTimeFieldSpec = Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+    Assert.assertEquals(actualDateTimeFieldSpec, expectedDateTimeFieldSpec);
+
+    // simple date format to millis
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.DAYS, "SIMPLE_DATE_FORMAT:yyyyMMdd", "incoming"),
+        new TimeGranularitySpec(DataType.LONG, TimeUnit.MILLISECONDS, "outgoing"));
+    try {
+      Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+      Assert.fail();
+    } catch (Exception e) {
+      // expected
+    }
+
+    // hours to simple date format
+    timeFieldSpec = new TimeFieldSpec(new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "incoming"),
+        new TimeGranularitySpec(DataType.INT, TimeUnit.HOURS, "SIMPLE_DATE_FORMAT:yyyyMMddhh", "outgoing"));
+    try {
+      Schema.convertToDateTimeFieldSpec(timeFieldSpec);
+      Assert.fail();
+    } catch (Exception e) {
+      // expected
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org