You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2018/11/15 00:29:53 UTC

[incubator-pinot] branch time_conversion created (now e465e30)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a change to branch time_conversion
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git.


      at e465e30  Fix the bug where time conversion is skipped when incoming and outgoing time column name are the same

This branch includes the following new commits:

     new e465e30  Fix the bug where time conversion is skipped when incoming and outgoing time column name are the same

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org


[incubator-pinot] 01/01: Fix the bug where time conversion is skipped when incoming and outgoing time column name are the same

Posted by ja...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch time_conversion
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit e465e30ae54446e7a19f035f92c7d6f2a902cb8b
Author: Xiaotian (Jackie) Jiang <xa...@linkedin.com>
AuthorDate: Wed Nov 14 16:27:35 2018 -0800

    Fix the bug where time conversion is skipped when incoming and outgoing time column name are the same
    
    1. Refactor TimeConverter to based on single time granularity spec
    2. Refactor TimeTransformer so it can automatically detect whether the conversion is needed (based on the value in the record)
    Added tests for the new TimeConverter and TimeTransformer
---
 .../common/utils/time/DefaultTimeConverter.java    |  83 ----------
 .../pinot/common/utils/time/TimeConverter.java     |  54 +++++--
 .../common/utils/time/TimeConverterProvider.java   |  27 ----
 .../utils/time/DefaultTimeConverterTest.java       | 174 ---------------------
 .../pinot/common/utils/time/TimeConverterTest.java |  82 ++++++++++
 .../data/recordtransformer/TimeTransformer.java    |  58 +++++--
 .../recordtransformer/RecordTransformerTest.java   |  25 ---
 .../recordtransformer/TimeTransformerTest.java     | 145 +++++++++++++++++
 8 files changed, 311 insertions(+), 337 deletions(-)

diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverter.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverter.java
deleted file mode 100644
index c50aebc..0000000
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.common.utils.time;
-
-import com.linkedin.pinot.common.data.TimeGranularitySpec;
-import com.linkedin.pinot.common.data.TimeGranularitySpec.TimeFormat;
-
-
-public class DefaultTimeConverter implements TimeConverter {
-
-  TimeGranularitySpec incoming;
-  TimeGranularitySpec outgoing;
-  private boolean conversionSupported;
-  private boolean needConversion;
-
-  public void init(TimeGranularitySpec incoming, TimeGranularitySpec outgoing) {
-    this.incoming = incoming;
-    this.outgoing = outgoing;
-    conversionSupported = false;
-    needConversion = true;
-    if (incoming.equals(outgoing)) {
-      needConversion = false;
-    }
-    if (TimeFormat.EPOCH.toString().equals(incoming.getTimeFormat()) && TimeFormat.EPOCH.toString()
-        .equals(outgoing.getTimeFormat())) {
-      conversionSupported = true;
-    }
-    if (needConversion && !conversionSupported) {
-      //TODO: Handle conversion between sdf <-> epoch
-      throw new RuntimeException("Conversion from Simple Date Format to epoch/simpleDateFormat is not supported");
-    }
-  }
-
-  @Override
-  public Object convert(Object incomingTimeValue) {
-    if (incomingTimeValue == null) {
-      return null;
-    }
-    long duration;
-    if (incomingTimeValue instanceof Number) {
-      duration = ((Number) incomingTimeValue).longValue();
-    } else {
-      duration = Long.parseLong(incomingTimeValue.toString());
-    }
-    if (conversionSupported) {
-      long outgoingTime = outgoing.getTimeType().convert(duration * incoming.getTimeUnitSize(), incoming.getTimeType());
-      return convertToOutgoingDataType(outgoingTime / outgoing.getTimeUnitSize());
-    } else {
-      //TODO: Handle conversion between sdf <-> epoch
-      throw new RuntimeException("Conversion from Simple Date Format to epoch/simpleDateFormat is not supported");
-    }
-  }
-
-  private Object convertToOutgoingDataType(long outgoingTimeValue) {
-    switch (outgoing.getDataType()) {
-      case INT:
-        return (int) outgoingTimeValue;
-      case LONG:
-        return outgoingTimeValue;
-      case FLOAT:
-        return (float) outgoingTimeValue;
-      case DOUBLE:
-        return (double) outgoingTimeValue;
-      case STRING:
-        return Long.toString(outgoingTimeValue);
-      default:
-        return outgoingTimeValue;
-    }
-  }
-}
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java
index ddf090f..dd09be8 100644
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java
+++ b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java
@@ -15,24 +15,50 @@
  */
 package com.linkedin.pinot.common.utils.time;
 
+import com.google.common.base.Preconditions;
 import com.linkedin.pinot.common.data.TimeGranularitySpec;
+import java.util.concurrent.TimeUnit;
+
 
 /**
- * TimeConverter to convert inputTimeValue whose spec is defined by incomingGranularitySpec to
- * outgoingGranularitySpec
+ * TimeConverter to convert value to/from milliseconds since epoch based on the given {@link TimeGranularitySpec}.
  */
-public interface TimeConverter {
-  /**
-   * @param incomingGranularitySpec
-   * @param outgoingGranularitySpec
-   */
-  void init(TimeGranularitySpec incomingGranularitySpec,
-      TimeGranularitySpec outgoingGranularitySpec);
+public class TimeConverter {
+  private final TimeGranularitySpec _timeGranularitySpec;
+
+  public TimeConverter(TimeGranularitySpec timeGranularitySpec) {
+    Preconditions.checkArgument(
+        timeGranularitySpec.getTimeFormat().equals(TimeGranularitySpec.TimeFormat.EPOCH.toString()),
+        "Cannot perform time conversion for time format other than EPOCH");
+    _timeGranularitySpec = timeGranularitySpec;
+  }
 
-  /**
-   * @param incoming time value based on incoming time spec
-   * @return time value based on outgoing time spec
-   */
-  Object convert(Object inputTimeValue);
+  public long toMillisSinceEpoch(Object value) {
+    long duration;
+    if (value instanceof Number) {
+      duration = ((Number) value).longValue();
+    } else {
+      duration = Long.parseLong(value.toString());
+    }
+    return _timeGranularitySpec.getTimeType().toMillis(duration * _timeGranularitySpec.getTimeUnitSize());
+  }
 
+  public Object fromMillisSinceEpoch(long value) {
+    long duration = _timeGranularitySpec.getTimeType().convert(value, TimeUnit.MILLISECONDS)
+        / _timeGranularitySpec.getTimeUnitSize();
+    switch (_timeGranularitySpec.getDataType()) {
+      case INT:
+        return (int) duration;
+      case LONG:
+        return duration;
+      case FLOAT:
+        return (float) duration;
+      case DOUBLE:
+        return (double) duration;
+      case STRING:
+        return Long.toString(duration);
+      default:
+        throw new IllegalStateException();
+    }
+  }
 }
diff --git a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverterProvider.java b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverterProvider.java
deleted file mode 100644
index 2c5cfb8..0000000
--- a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverterProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.common.utils.time;
-
-import com.linkedin.pinot.common.data.TimeGranularitySpec;
-
-public class TimeConverterProvider {
-  public static TimeConverter getTimeConverter(TimeGranularitySpec incomingGranularitySpec,
-      TimeGranularitySpec outgoingGranularitySpec) {
-    DefaultTimeConverter timeConverter = new DefaultTimeConverter();
-    timeConverter.init(incomingGranularitySpec, outgoingGranularitySpec);
-    return timeConverter;
-  }
-}
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverterTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverterTest.java
deleted file mode 100644
index 2492bfd..0000000
--- a/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverterTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *         http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.common.utils.time;
-
-import static java.util.concurrent.TimeUnit.*;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static com.linkedin.pinot.common.data.FieldSpec.DataType.*;
-import com.linkedin.pinot.common.data.TimeGranularitySpec;
-import com.linkedin.pinot.common.data.TimeGranularitySpec.TimeFormat;
-public class DefaultTimeConverterTest {
-
-  @Test
-  public void testWithSameTimeSpec() {
-    TimeGranularitySpec spec = new TimeGranularitySpec(LONG, 1, DAYS, "1day");
-    DefaultTimeConverter timeConverter = new DefaultTimeConverter();
-    timeConverter.init(spec, spec);
-    for (int i = 0; i < 1000; ++i) {
-      Object convertedValue = timeConverter.convert(i);
-      Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-      Assert.assertEquals(((Long) convertedValue).intValue(), i);
-    }
-    for (long i = 0; i < 1000; ++i) {
-      Object convertedValue = timeConverter.convert(i);
-      Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-      Assert.assertEquals(((Long) convertedValue).longValue(), i);
-    }
-  }
-
-  @Test
-  public void testWithDifferentTimeSpecButSameValue() {
-    TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS, "2days");
-    TimeGranularitySpec outgoing = new TimeGranularitySpec(LONG, 48, HOURS, "48hours");
-    DefaultTimeConverter timeConverter = new DefaultTimeConverter();
-    timeConverter.init(incoming, outgoing);
-    for (int i = 0; i < 1000; ++i) {
-      Object convertedValue = timeConverter.convert(i);
-      Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-      Assert.assertEquals(((Long) convertedValue).intValue(), i);
-    }
-    for (long i = 0; i < 1000; ++i) {
-      Object convertedValue = timeConverter.convert(i);
-      Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-      Assert.assertEquals(((Long) convertedValue).longValue(), i);
-    }
-  }
-
-  @Test
-  public void testWithDifferentTimeSpecs() {
-    TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS, "2days");
-    TimeGranularitySpec outgoing = new TimeGranularitySpec(LONG, 24, HOURS, "24hours");
-    DefaultTimeConverter timeConverter = new DefaultTimeConverter();
-    timeConverter.init(incoming, outgoing);
-    for (int i = 0; i < 1000; ++i) {
-      Object convertedValue = timeConverter.convert(i);
-      Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-      Assert.assertEquals(((Long) convertedValue).intValue(), i * 2);
-    }
-    for (long i = 0; i < 1000; ++i) {
-      Object convertedValue = timeConverter.convert(i);
-      Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-      Assert.assertEquals(((Long) convertedValue).longValue(), i * 2);
-    }
-  }
-
-  @Test
-  public void testWithDifferentIncomingValueTypes() {
-    TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS, "2days");
-    TimeGranularitySpec outgoing = new TimeGranularitySpec(LONG, 24, HOURS, "24hours");
-    DefaultTimeConverter timeConverter = new DefaultTimeConverter();
-    timeConverter.init(incoming, outgoing);
-    Object convertedValue = timeConverter.convert("1");
-    Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-    Assert.assertEquals(((Long) convertedValue).intValue(), 2);
-    convertedValue = timeConverter.convert(1);
-    Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-    Assert.assertEquals(((Long) convertedValue).intValue(), 2);
-    convertedValue = timeConverter.convert((long) 1);
-    Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-    Assert.assertEquals(((Long) convertedValue).intValue(), 2);
-    convertedValue = timeConverter.convert((short) 1);
-    Assert.assertTrue(convertedValue instanceof Long, "Converted value data type should be Long");
-    Assert.assertEquals(((Long) convertedValue).intValue(), 2);
-  }
-
-  @Test
-  public void testWithOutgoingValueTypesString() {
-    TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS, "2days");
-    TimeGranularitySpec outgoing = new TimeGranularitySpec(STRING, 24, HOURS, "24hours");
-    DefaultTimeConverter timeConverter = new DefaultTimeConverter();
-    timeConverter.init(incoming, outgoing);
-    Object convertedValue = timeConverter.convert("1");
-    Assert.assertTrue(convertedValue instanceof String, "Converted value data type should be STRING");
-    Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
-    Assert.assertEquals(convertedValue, "2");
-    convertedValue = timeConverter.convert(1);
-    Assert.assertTrue(convertedValue instanceof String, "Converted value data type should be STRING");
-    Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
-    Assert.assertEquals(convertedValue, "2");
-    convertedValue = timeConverter.convert((long) 1);
-    Assert.assertTrue(convertedValue instanceof String, "Converted value data type should be STRING");
-    Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
-    Assert.assertEquals(convertedValue, "2");
-    convertedValue = timeConverter.convert((short) 1);
-    Assert.assertTrue(convertedValue instanceof String, "Converted value data type should be STRING");
-    Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
-    Assert.assertEquals(convertedValue, "2");
-  }
-
-  @Test
-  public void testSimpleDateFormat() {
-    TimeGranularitySpec incoming;
-    TimeGranularitySpec outgoing;
-    DefaultTimeConverter timeConverter;
-    String SDF_PREFIX = TimeFormat.SIMPLE_DATE_FORMAT.toString();
-
-    //this should not throw exception, since incoming == outgoing
-    try {
-      incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX + ":yyyyMMdd", "1hour");
-      outgoing = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX + ":yyyyMMdd", "1hour");
-      timeConverter = new DefaultTimeConverter();
-      timeConverter.init(incoming, outgoing);
-    } catch (Exception e) {
-      Assert.fail("sdf to sdf must be supported as long as incoming sdf = outgoing sdf");
-    }
-    //we don't support epoch to sdf conversion
-    try {
-      incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX + ":yyyyMMdd", "1hour");
-      outgoing = new TimeGranularitySpec(LONG, 1, HOURS, "1hour");
-      timeConverter = new DefaultTimeConverter();
-      timeConverter.init(incoming, outgoing);
-      Assert.fail("We don't support converting epoch to sdf currently");
-    } catch (Exception e) {
-      //expected
-    }
-
-    //we don't support sdf to epoch conversion
-    try {
-      incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX + ":yyyyMMdd", "1hour");
-      outgoing = new TimeGranularitySpec(LONG, 1, HOURS, "1hour");
-      timeConverter = new DefaultTimeConverter();
-      timeConverter.init(incoming, outgoing);
-      Assert.fail("We don't support converting sdf to epoch currently");
-    } catch (Exception e) {
-      //expected
-    }
-    
-    //we don't support sdf to sdf conversion where incoming sdf != outoging sdf
-    try {
-      incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX + ":yyyyMMdd", "1hour");
-      outgoing = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX + ":yyyyMMddHH", "1hour");
-      timeConverter = new DefaultTimeConverter();
-      timeConverter.init(incoming, outgoing);
-      Assert.fail("We don't support converting sdf to sdf where incoming sdf != outgoing sdf");
-    } catch (Exception e) {
-      //expected
-    }
-  }
-}
diff --git a/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/TimeConverterTest.java b/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/TimeConverterTest.java
new file mode 100644
index 0000000..396f85a
--- /dev/null
+++ b/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/TimeConverterTest.java
@@ -0,0 +1,82 @@
+package com.linkedin.pinot.common.utils.time;
+
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.common.data.TimeGranularitySpec;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeConverterTest {
+
+  @Test
+  public void testIntTimeColumn() {
+    TimeConverter converter =
+        new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.INT, 5, TimeUnit.HOURS, "time"));
+    // Should support conversion from all data types
+    assertEquals(converter.toMillisSinceEpoch(123), 123 * TimeUnit.HOURS.toMillis(5));
+    assertEquals(converter.toMillisSinceEpoch(123L), 123 * TimeUnit.HOURS.toMillis(5));
+    assertEquals(converter.toMillisSinceEpoch(123f), 123 * TimeUnit.HOURS.toMillis(5));
+    assertEquals(converter.toMillisSinceEpoch(123d), 123 * TimeUnit.HOURS.toMillis(5));
+    assertEquals(converter.toMillisSinceEpoch("123"), 123 * TimeUnit.HOURS.toMillis(5));
+
+    assertEquals(converter.fromMillisSinceEpoch(123 * TimeUnit.HOURS.toMillis(5)), 123);
+  }
+
+  @Test
+  public void testLongTimeColumn() {
+    TimeConverter converter =
+        new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.LONG, 10, TimeUnit.DAYS, "time"));
+    // Should support conversion from all data types
+    assertEquals(converter.toMillisSinceEpoch(123), 123 * TimeUnit.DAYS.toMillis(10));
+    assertEquals(converter.toMillisSinceEpoch(123L), 123 * TimeUnit.DAYS.toMillis(10));
+    assertEquals(converter.toMillisSinceEpoch(123f), 123 * TimeUnit.DAYS.toMillis(10));
+    assertEquals(converter.toMillisSinceEpoch(123d), 123 * TimeUnit.DAYS.toMillis(10));
+    assertEquals(converter.toMillisSinceEpoch("123"), 123 * TimeUnit.DAYS.toMillis(10));
+
+    assertEquals(converter.fromMillisSinceEpoch(123 * TimeUnit.DAYS.toMillis(10)), 123L);
+  }
+
+  @Test
+  public void testFloatTimeColumn() {
+    TimeConverter converter =
+        new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.FLOAT, 1, TimeUnit.SECONDS, "time"));
+    // Should support conversion from all data types
+    assertEquals(converter.toMillisSinceEpoch(123), 123 * TimeUnit.SECONDS.toMillis(1));
+    assertEquals(converter.toMillisSinceEpoch(123L), 123 * TimeUnit.SECONDS.toMillis(1));
+    assertEquals(converter.toMillisSinceEpoch(123f), 123 * TimeUnit.SECONDS.toMillis(1));
+    assertEquals(converter.toMillisSinceEpoch(123d), 123 * TimeUnit.SECONDS.toMillis(1));
+    assertEquals(converter.toMillisSinceEpoch("123"), 123 * TimeUnit.SECONDS.toMillis(1));
+
+    assertEquals(converter.fromMillisSinceEpoch(123 * TimeUnit.SECONDS.toMillis(1)), 123f);
+  }
+
+  @Test
+  public void testDoubleTimeColumn() {
+    TimeConverter converter =
+        new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.DOUBLE, 3, TimeUnit.MINUTES, "time"));
+    // Should support conversion from all data types
+    assertEquals(converter.toMillisSinceEpoch(123), 123 * TimeUnit.MINUTES.toMillis(3));
+    assertEquals(converter.toMillisSinceEpoch(123L), 123 * TimeUnit.MINUTES.toMillis(3));
+    assertEquals(converter.toMillisSinceEpoch(123f), 123 * TimeUnit.MINUTES.toMillis(3));
+    assertEquals(converter.toMillisSinceEpoch(123d), 123 * TimeUnit.MINUTES.toMillis(3));
+    assertEquals(converter.toMillisSinceEpoch("123"), 123 * TimeUnit.MINUTES.toMillis(3));
+
+    assertEquals(converter.fromMillisSinceEpoch(123 * TimeUnit.MINUTES.toMillis(3)), 123d);
+  }
+
+  @Test
+  public void testStringTimeColumn() {
+    TimeConverter converter =
+        new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.STRING, 100, TimeUnit.MILLISECONDS, "time"));
+    // Should support conversion from all data types
+    assertEquals(converter.toMillisSinceEpoch(123), 123 * TimeUnit.MILLISECONDS.toMillis(100));
+    assertEquals(converter.toMillisSinceEpoch(123L), 123 * TimeUnit.MILLISECONDS.toMillis(100));
+    assertEquals(converter.toMillisSinceEpoch(123f), 123 * TimeUnit.MILLISECONDS.toMillis(100));
+    assertEquals(converter.toMillisSinceEpoch(123d), 123 * TimeUnit.MILLISECONDS.toMillis(100));
+    assertEquals(converter.toMillisSinceEpoch("123"), 123 * TimeUnit.MILLISECONDS.toMillis(100));
+
+    assertEquals(converter.fromMillisSinceEpoch(123 * TimeUnit.MILLISECONDS.toMillis(100)), "123");
+  }
+}
diff --git a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
index 2ee8e00..c97a192 100644
--- a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
+++ b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
@@ -19,8 +19,9 @@ import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.common.data.TimeFieldSpec;
 import com.linkedin.pinot.common.data.TimeGranularitySpec;
 import com.linkedin.pinot.common.utils.time.TimeConverter;
-import com.linkedin.pinot.common.utils.time.TimeConverterProvider;
 import com.linkedin.pinot.core.data.GenericRow;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 
 
 /**
@@ -29,9 +30,14 @@ import com.linkedin.pinot.core.data.GenericRow;
  * column for other record transformers (incoming time column can be ignored).
  */
 public class TimeTransformer implements RecordTransformer {
-  private final String _incomingTimeColumn;
-  private final String _outgoingTimeColumn;
-  private final TimeConverter _timeConverter;
+  private static final long MIN_VALID_TIME = new DateTime(2000, 1, 1, 0, 0, DateTimeZone.UTC).getMillis();
+  private static final long MAX_VALID_TIME = new DateTime(2050, 1, 1, 0, 0, DateTimeZone.UTC).getMillis();
+
+  private String _incomingTimeColumn;
+  private String _outgoingTimeColumn;
+  private TimeConverter _incomingTimeConverter;
+  private TimeConverter _outgoingTimeConverter;
+  private boolean _isFirstRecord = true;
 
   public TimeTransformer(Schema schema) {
     TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec();
@@ -41,25 +47,49 @@ public class TimeTransformer implements RecordTransformer {
       if (!incomingGranularitySpec.equals(outgoingGranularitySpec)) {
         _incomingTimeColumn = incomingGranularitySpec.getName();
         _outgoingTimeColumn = outgoingGranularitySpec.getName();
-        _timeConverter = TimeConverterProvider.getTimeConverter(incomingGranularitySpec, outgoingGranularitySpec);
-        return;
+        _incomingTimeConverter = new TimeConverter(incomingGranularitySpec);
+        _outgoingTimeConverter = new TimeConverter(outgoingGranularitySpec);
       }
     }
-    _incomingTimeColumn = null;
-    _outgoingTimeColumn = null;
-    _timeConverter = null;
   }
 
   @Override
   public GenericRow transform(GenericRow record) {
-    if (_timeConverter == null) {
+    if (_incomingTimeColumn == null) {
       return record;
     }
-    // Skip transformation if outgoing value already exist
-    // NOTE: outgoing value might already exist for OFFLINE data
-    if (record.getValue(_outgoingTimeColumn) == null) {
-      record.putField(_outgoingTimeColumn, _timeConverter.convert(record.getValue(_incomingTimeColumn)));
+    // Use the first record for sanity check and determine whether the conversion is needed
+    Object incomingTimeValue = record.getValue(_incomingTimeColumn);
+    if (_isFirstRecord) {
+      _isFirstRecord = false;
+      // If incoming time value does not exist or the value is invalid after conversion, check the outgoing time value.
+      // If the outgoing time value is valid, skip time conversion, otherwise, throw exception.
+      if (incomingTimeValue == null || !isValidTime(_incomingTimeConverter.toMillisSinceEpoch(incomingTimeValue))) {
+        Object outgoingTimeValue = record.getValue(_outgoingTimeColumn);
+        if (outgoingTimeValue == null || !isValidTime(_outgoingTimeConverter.toMillisSinceEpoch(outgoingTimeValue))) {
+          throw new IllegalStateException(
+              "No valid time value found in either incoming time column: " + _incomingTimeColumn
+                  + " or outgoing time column: " + _outgoingTimeColumn);
+        } else {
+          disableConversion();
+          return record;
+        }
+      }
     }
+
+    record.putField(_outgoingTimeColumn,
+        _outgoingTimeConverter.fromMillisSinceEpoch(_incomingTimeConverter.toMillisSinceEpoch(incomingTimeValue)));
     return record;
   }
+
+  private void disableConversion() {
+    _incomingTimeColumn = null;
+    _outgoingTimeColumn = null;
+    _incomingTimeConverter = null;
+    _outgoingTimeConverter = null;
+  }
+
+  private static boolean isValidTime(long millisSinceEpoch) {
+    return millisSinceEpoch > MIN_VALID_TIME && millisSinceEpoch < MAX_VALID_TIME;
+  }
 }
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
index fb5f0fc..6d2940b 100644
--- a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
@@ -20,7 +20,6 @@ import com.linkedin.pinot.common.data.Schema;
 import com.linkedin.pinot.core.data.GenericRow;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.*;
@@ -41,9 +40,6 @@ public class RecordTransformerTest {
       // For sanitation
       .addSingleValueDimension("svStringWithNullCharacters", FieldSpec.DataType.STRING)
       .addSingleValueDimension("svStringWithLengthLimit", FieldSpec.DataType.STRING)
-      // For time conversion
-      .addTime("incoming", 6, TimeUnit.HOURS, FieldSpec.DataType.INT, "outgoing", 1, TimeUnit.MILLISECONDS,
-          FieldSpec.DataType.LONG)
       .build();
 
   static {
@@ -67,25 +63,11 @@ public class RecordTransformerTest {
     fields.put("mvDouble", new Object[]{123});
     fields.put("svStringWithNullCharacters", "1\0002\0003");
     fields.put("svStringWithLengthLimit", "123");
-    fields.put("incoming", "123");
     record.init(fields);
     return record;
   }
 
   @Test
-  public void testTimeTransformer() {
-    RecordTransformer transformer = new TimeTransformer(SCHEMA);
-    GenericRow record = getRecord();
-    for (int i = 0; i < NUM_ROUNDS; i++) {
-      record = transformer.transform(record);
-      assertNotNull(record);
-      // We keep the incoming time field in case other transformers rely on it
-      assertEquals(record.getValue("incoming"), "123");
-      assertEquals(record.getValue("outgoing"), 123 * 6 * 3600 * 1000L);
-    }
-  }
-
-  @Test
   public void testDataTypeTransformer() {
     RecordTransformer transformer = new DataTypeTransformer(SCHEMA);
     GenericRow record = getRecord();
@@ -103,11 +85,6 @@ public class RecordTransformerTest {
       assertEquals(record.getValue("mvDouble"), new Object[]{123d});
       assertEquals(record.getValue("svStringWithNullCharacters"), "1\0002\0003");
       assertEquals(record.getValue("svStringWithLengthLimit"), "123");
-      // Incoming time field won't be converted (it's ignored in this transformer)
-      assertEquals(record.getValue("incoming"), "123");
-      // Outgoing time field will be converted (without time transformer, this field will be null before transform, and
-      // be filled with default null value after transform)
-      assertEquals(record.getValue("outgoing"), Long.MIN_VALUE);
     }
   }
 
@@ -141,8 +118,6 @@ public class RecordTransformerTest {
       assertEquals(record.getValue("mvDouble"), new Object[]{123d});
       assertEquals(record.getValue("svStringWithNullCharacters"), "1");
       assertEquals(record.getValue("svStringWithLengthLimit"), "12");
-      assertEquals(record.getValue("incoming"), "123");
-      assertEquals(record.getValue("outgoing"), 123 * 6 * 3600 * 1000L);
     }
   }
 
diff --git a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformerTest.java b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformerTest.java
new file mode 100644
index 0000000..9e20a8b
--- /dev/null
+++ b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformerTest.java
@@ -0,0 +1,145 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. (pinot-core@linkedin.com)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *         http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.data.recordtransformer;
+
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.data.TimeFieldSpec;
+import com.linkedin.pinot.common.data.TimeGranularitySpec;
+import com.linkedin.pinot.core.data.GenericRow;
+import java.util.concurrent.TimeUnit;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeTransformerTest {
+  private static final long VALID_TIME = new DateTime(2018, 1, 1, 0, 0, DateTimeZone.UTC).getMillis();
+
+  @Test
+  public void testTimeFormat() {
+    // When incoming and outgoing spec are the same, any time format should work
+    Schema schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
+        TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(), "time")));
+    TimeTransformer transformer = new TimeTransformer(schema);
+    GenericRow record = new GenericRow();
+    record.putField("time", 20180101);
+    record = transformer.transform(record);
+    assertNotNull(record);
+    assertEquals(record.getValue("time"), 20180101);
+
+    // When incoming and outgoing spec are not the same, simple date format is not allowed
+    schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
+        TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(), "incoming"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.SECONDS, "outgoing")));
+    try {
+      new TimeTransformer(schema);
+      fail();
+    } catch (IllegalArgumentException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testSkipConversion() {
+    // When incoming time does not exist or is invalid, outgoing time exists and is valid, skip conversion
+    Schema schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "incoming"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "outgoing")));
+    TimeTransformer transformer = new TimeTransformer(schema);
+    GenericRow record = new GenericRow();
+    record.putField("outgoing", VALID_TIME);
+    record = transformer.transform(record);
+    assertNotNull(record);
+    assertEquals(record.getValue("outgoing"), VALID_TIME);
+
+    transformer = new TimeTransformer(schema);
+    record = new GenericRow();
+    record.putField("incoming", 123);
+    record.putField("outgoing", VALID_TIME);
+    record = transformer.transform(record);
+    assertNotNull(record);
+    assertEquals(record.getValue("outgoing"), VALID_TIME);
+
+    // When incoming and outgoing time column is the same, and the value is already converted, skip conversion
+    schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "time"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time")));
+    transformer = new TimeTransformer(schema);
+    record = new GenericRow();
+    record.putField("time", VALID_TIME);
+    record = transformer.transform(record);
+    assertNotNull(record);
+    assertEquals(record.getValue("time"), VALID_TIME);
+
+    // When both incoming and outgoing time do not exist or are invalid, throw exception
+    schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "incoming"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "outgoing")));
+    transformer = new TimeTransformer(schema);
+    record = new GenericRow();
+    record.putField("incoming", 123);
+    record.putField("outgoing", 123);
+    try {
+      transformer.transform(record);
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+
+    schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "time"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time")));
+    transformer = new TimeTransformer(schema);
+    record = new GenericRow();
+    record.putField("time", 123);
+    try {
+      transformer.transform(record);
+      fail();
+    } catch (IllegalStateException e) {
+      // Expected
+    }
+  }
+
+  @Test
+  public void testTimeConversion() {
+    // When incoming time exists and is valid, do the conversion
+    Schema schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "incoming"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "outgoing")));
+    TimeTransformer transformer = new TimeTransformer(schema);
+    GenericRow record = new GenericRow();
+    record.putField("incoming", TimeUnit.MILLISECONDS.toDays(VALID_TIME));
+    record = transformer.transform(record);
+    assertNotNull(record);
+    assertEquals(record.getValue("outgoing"), VALID_TIME);
+
+    // When incoming and outgoing time column is the same, and the value is not yet converted, do the conversion
+    schema = new Schema();
+    schema.addField(new TimeFieldSpec(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "time"),
+        new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time")));
+    transformer = new TimeTransformer(schema);
+    record = new GenericRow();
+    record.putField("time", TimeUnit.MILLISECONDS.toDays(VALID_TIME));
+    record = transformer.transform(record);
+    assertNotNull(record);
+    assertEquals(record.getValue("time"), VALID_TIME);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org