You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ne...@apache.org on 2020/05/19 23:46:20 UTC

[incubator-pinot] branch master updated: DATE_TIME should work as the primary time column for Pinot tables (#5399)

This is an automated email from the ASF dual-hosted git repository.

nehapawar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 85474ba  DATE_TIME should work as the primary time column for Pinot tables (#5399)
85474ba is described below

commit 85474bae9752fea584ca3c2c88e961361dda5e17
Author: Neha Pawar <ne...@gmail.com>
AuthorDate: Tue May 19 16:46:11 2020 -0700

    DATE_TIME should work as the primary time column for Pinot tables (#5399)
    
    This PR ensures that Pinot can use a DateTimeFieldSpec as a primary time column for the table.
    After this change, we no longer need to use TimeFieldSpec and new schemas should be created using DateTimeFieldSpec (but can very well keep using TimeFieldSpec, even for new schemas, if you really wish to). Older schemas having TimeFieldSpec will continue to work just as before.
    TimeFieldSpec, TimeGranularitySpec, TimeConverter have been marked as Deprecated. Some tests have been modified to test both cases, some have been changed to use datetime, and some retain time. A new integration test has been added which used only DateTimeFieldSpec. Most integration tests continue to use schema with TimeFieldSpec. The quickstart has been updated to use DateTimeFieldSpec.
    
    A change to note:
    The segment.time.column.name property in the segment metadata is now used to keep primary time column. This will match with the time column name in tableConfig. This can be either timeFieldSpec or dateTimeFieldSpec. If primary time column is timeFieldSpec, it will be present in only this property. If primary time column is dateTimeFieldSpec, it will also be present in segment.datetime.column.names
---
 .../routing/timeboundary/TimeBoundaryManager.java  |  14 +-
 .../broker/broker/HelixBrokerStarterTest.java      |   7 +-
 .../timeboundary/TimeBoundaryManagerTest.java      | 153 ++++++++++++---------
 .../apache/pinot/common/data/FieldSpecTest.java    |   2 +-
 .../controller/util/AutoAddInvertedIndex.java      |   8 +-
 ...PinotInstanceAssignmentRestletResourceTest.java |   6 +-
 .../data/function/FunctionEvaluatorFactory.java    |   3 +-
 .../realtime/HLRealtimeSegmentDataManager.java     |  10 +-
 .../apache/pinot/core/minion/SegmentPurger.java    |   2 +-
 .../creator/impl/SegmentColumnarIndexCreator.java  |  78 +++++------
 .../core/segment/creator/impl/V1Constants.java     |   4 +
 .../index/metadata/SegmentMetadataImpl.java        |   2 +-
 .../name/NormalizedDateSegmentNameGenerator.java   |  21 +--
 .../MultiplePinotSegmentRecordReaderTest.java      |  11 +-
 .../pinot/core/data/readers/PinotSegmentUtil.java  |  59 ++++----
 .../generator/SegmentGeneratorConfigTest.java      |  25 +++-
 .../MutableSegmentImplAggregateMetricsTest.java    |  58 +++++---
 .../function/DateTruncTransformFunctionTest.java   |  50 ++++---
 ...adataAndDictionaryAggregationPlanMakerTest.java |   2 +-
 .../SegmentGenerationWithTimeColumnTest.java       |  49 +++++++
 .../NormalizedDateSegmentNameGeneratorTest.java    |  29 ++--
 .../apache/pinot/core/util/SchemaUtilsTest.java    |  17 +++
 ...eTimeFieldSpecHybridClusterIntegrationTest.java |  47 +++++++
 .../LuceneRealtimeClusterIntegrationTest.java      |   3 +-
 ..._100k_subset_nonulls_datetimefieldspecs.schema} |  77 +++--------
 ...onulls_default_column_test_extra_columns.schema |   2 +-
 .../batch/common/SegmentGenerationTaskRunner.java  |  20 ++-
 .../hadoop/job/HadoopSegmentPreprocessingJob.java  |  16 ++-
 .../pinot/hadoop/job/InternalConfigConstants.java  |   1 +
 .../hadoop/job/mappers/SegmentCreationMapper.java  |  16 +--
 .../job/mappers/SegmentPreprocessingMapper.java    |  12 +-
 .../spark/jobs/SparkSegmentCreationFunction.java   |  17 +--
 .../pinot/plugin/inputformat/avro/AvroUtils.java   |  10 ++
 .../plugin/inputformat/avro/AvroUtilsTest.java     |  11 ++
 .../java/org/apache/pinot/spi/data/Schema.java     |   8 +-
 .../org/apache/pinot/spi/data/TimeFieldSpec.java   |   8 +-
 .../apache/pinot/spi/data/TimeGranularitySpec.java |  65 +--------
 .../org/apache/pinot/spi/utils/TimeConverter.java  |   1 +
 .../org/apache/pinot/tools/SegmentDumpTool.java    |   3 +-
 .../pinot/tools/data/generator/DataGenerator.java  |   2 +-
 .../query/comparison/SegmentInfoProvider.java      |   1 +
 .../segment/converter/SegmentMergeCommand.java     |  15 +-
 .../batch/airlineStats/airlineStats_schema.json    |  11 +-
 .../stream/airlineStats/airlineStats_schema.json   |  11 +-
 .../stream/meetupRsvp/meetupRsvp_schema.json       |  11 +-
 45 files changed, 563 insertions(+), 415 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index f422eab..5e4b019 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -34,9 +34,9 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.utils.CommonConstants;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
-import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -71,11 +71,11 @@ public class TimeBoundaryManager {
     _timeColumn = tableConfig.getValidationConfig().getTimeColumnName();
     Preconditions
         .checkNotNull(_timeColumn, "Time column must be configured in table config for table: %s", _offlineTableName);
-    FieldSpec fieldSpec = schema.getFieldSpecFor(_timeColumn);
-    Preconditions
-        .checkNotNull(fieldSpec, "Field spec must be specified in schema for time column: %s of table: %s", _timeColumn,
-            _offlineTableName);
-    _timeUnit = ((TimeFieldSpec) fieldSpec).getOutgoingGranularitySpec().getTimeType();
+    DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn);
+    Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s",
+        _timeColumn, _offlineTableName);
+    DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat());
+    _timeUnit = formatSpec.getColumnUnit();
     Preconditions
         .checkNotNull(_timeUnit, "Time unit must be configured in the field spec for time column: %s of table: %s",
             _timeColumn, _offlineTableName);
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
index b855fb5..a2970ab 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/broker/HelixBrokerStarterTest.java
@@ -39,6 +39,9 @@ import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.pql.parsers.Pql2Compiler;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
@@ -81,7 +84,9 @@ public class HelixBrokerStarterTest extends ControllerTest {
     addFakeServerInstancesToAutoJoinHelixCluster(NUM_SERVERS, true);
 
     Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, TIME_COLUMN_NAME), null).build();
+        .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.INT,
+            new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), DateTimeFieldSpec.TimeFormat.EPOCH.toString())
+                .getFormat(), new DateTimeGranularitySpec(1, TimeUnit.DAYS).getGranularity()).build();
     _helixResourceManager.addSchema(schema, true);
     TableConfig offlineTableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setTimeColumnName(TIME_COLUMN_NAME)
diff --git a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
index 9cca5b3..98f5d3e 100644
--- a/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
+++ b/pinot-broker/src/test/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManagerTest.java
@@ -75,85 +75,110 @@ public class TimeBoundaryManagerTest {
     ExternalView externalView = Mockito.mock(ExternalView.class);
 
     for (TimeUnit timeUnit : TimeUnit.values()) {
-      // Test DAILY push table
+      // Test DAILY push table, with timeFieldSpec
       String rawTableName = "testTable_" + timeUnit + "_DAILY";
-      TableConfig tableConfig = getTableConfig(rawTableName, timeUnit, "DAILY");
-      setSchema(rawTableName, timeUnit);
-
-      // Start with no segment
-      TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
-      Set<String> onlineSegments = new HashSet<>();
-      timeBoundaryManager.init(externalView, onlineSegments);
-      assertNull(timeBoundaryManager.getTimeBoundaryInfo());
-
-      // Add the first segment should update the time boundary
-      String segment0 = "segment0";
-      onlineSegments.add(segment0);
-      setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
-      timeBoundaryManager.init(externalView, onlineSegments);
-      verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS));
-
-      // Add a new segment with larger end time should update the time boundary
-      String segment1 = "segment1";
-      onlineSegments.add(segment1);
-      setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit);
-      timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
-      verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
-
-      // Add a new segment with smaller end time should not change the time boundary
-      String segment2 = "segment2";
-      onlineSegments.add(segment2);
-      setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit);
-      timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
-      verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
-
-      // Remove the segment with largest end time should update the time boundary
-      onlineSegments.remove(segment1);
-      timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
-      verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
-
-      // Change segment ZK metadata without refreshing should not update the time boundary
-      setSegmentZKMetadata(rawTableName, segment2, 5, timeUnit);
-      timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
-      verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
-
-      // Refresh the changed segment should update the time boundary
-      timeBoundaryManager.refreshSegment(segment2);
-      verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(4, TimeUnit.DAYS));
+      TableConfig tableConfig = getTableConfig(rawTableName, "DAILY");
+      setSchemaTimeFieldSpec(rawTableName, timeUnit);
+      testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView);
 
-      // Test HOURLY push table
+      // Test HOURLY push table, with timeFieldSpec
       rawTableName = "testTable_" + timeUnit + "_HOURLY";
-      tableConfig = getTableConfig(rawTableName, timeUnit, "HOURLY");
-      setSchema(rawTableName, timeUnit);
-      timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
-      onlineSegments = new HashSet<>();
-      onlineSegments.add(segment0);
-      setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
-      timeBoundaryManager.init(externalView, onlineSegments);
-      long expectedTimeValue;
-      if (timeUnit == TimeUnit.DAYS) {
-        // Time boundary should be endTime - 1 DAY when time unit is DAYS
-        expectedTimeValue = timeUnit.convert(1, TimeUnit.DAYS);
-      } else {
-        // Time boundary should be endTime - 1 HOUR when time unit is other than DAYS
-        expectedTimeValue = timeUnit.convert(47, TimeUnit.HOURS);
-      }
-      verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), expectedTimeValue);
+      tableConfig = getTableConfig(rawTableName, "HOURLY");
+      setSchemaTimeFieldSpec(rawTableName, timeUnit);
+      testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+
+      // Test DAILY push table with dateTimeFieldSpec
+      rawTableName = "testTableDateTime_" + timeUnit + "_DAILY";
+      tableConfig = getTableConfig(rawTableName, "DAILY");
+      setSchemaDateTimeFieldSpec(rawTableName, timeUnit);
+      testDailyPushTable(rawTableName, tableConfig, timeUnit, externalView);
+
+      // Test HOURLY push table
+      rawTableName = "testTableDateTime_" + timeUnit + "_HOURLY";
+      tableConfig = getTableConfig(rawTableName, "HOURLY");
+      setSchemaDateTimeFieldSpec(rawTableName, timeUnit);
+      testHourlyPushTable(rawTableName, tableConfig, timeUnit, externalView);
     }
   }
 
-  private TableConfig getTableConfig(String rawTableName, TimeUnit timeUnit, String pushFrequency) {
+  private void testDailyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit, ExternalView externalView) {
+    // Start with no segment
+    TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
+    Set<String> onlineSegments = new HashSet<>();
+    timeBoundaryManager.init(externalView, onlineSegments);
+    assertNull(timeBoundaryManager.getTimeBoundaryInfo());
+
+    // Add the first segment should update the time boundary
+    String segment0 = "segment0";
+    onlineSegments.add(segment0);
+    setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
+    timeBoundaryManager.init(externalView, onlineSegments);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(1, TimeUnit.DAYS));
+
+    // Add a new segment with larger end time should update the time boundary
+    String segment1 = "segment1";
+    onlineSegments.add(segment1);
+    setSegmentZKMetadata(rawTableName, segment1, 4, timeUnit);
+    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
+
+    // Add a new segment with smaller end time should not change the time boundary
+    String segment2 = "segment2";
+    onlineSegments.add(segment2);
+    setSegmentZKMetadata(rawTableName, segment2, 3, timeUnit);
+    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(3, TimeUnit.DAYS));
+
+    // Remove the segment with largest end time should update the time boundary
+    onlineSegments.remove(segment1);
+    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
+
+    // Change segment ZK metadata without refreshing should not update the time boundary
+    setSegmentZKMetadata(rawTableName, segment2, 5, timeUnit);
+    timeBoundaryManager.onExternalViewChange(externalView, onlineSegments);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(2, TimeUnit.DAYS));
+
+    // Refresh the changed segment should update the time boundary
+    timeBoundaryManager.refreshSegment(segment2);
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), timeUnit.convert(4, TimeUnit.DAYS));
+  }
+
+  private void testHourlyPushTable(String rawTableName, TableConfig tableConfig, TimeUnit timeUnit, ExternalView externalView) {
+    TimeBoundaryManager timeBoundaryManager = new TimeBoundaryManager(tableConfig, _propertyStore);
+    Set<String> onlineSegments = new HashSet<>();
+    String segment0 = "segment0";
+    onlineSegments.add(segment0);
+    setSegmentZKMetadata(rawTableName, segment0, 2, timeUnit);
+    timeBoundaryManager.init(externalView, onlineSegments);
+    long expectedTimeValue;
+    if (timeUnit == TimeUnit.DAYS) {
+      // Time boundary should be endTime - 1 DAY when time unit is DAYS
+      expectedTimeValue = timeUnit.convert(1, TimeUnit.DAYS);
+    } else {
+      // Time boundary should be endTime - 1 HOUR when time unit is other than DAYS
+      expectedTimeValue = timeUnit.convert(47, TimeUnit.HOURS);
+    }
+    verifyTimeBoundaryInfo(timeBoundaryManager.getTimeBoundaryInfo(), expectedTimeValue);
+  }
+
+  private TableConfig getTableConfig(String rawTableName, String pushFrequency) {
     return new TableConfigBuilder(TableType.OFFLINE).setTableName(rawTableName).setTimeColumnName(TIME_COLUMN)
-        .setTimeType(timeUnit.name()).setSegmentPushFrequency(pushFrequency).build();
+        .setSegmentPushFrequency(pushFrequency).build();
   }
 
-  private void setSchema(String rawTableName, TimeUnit timeUnit) {
+  private void setSchemaTimeFieldSpec(String rawTableName, TimeUnit timeUnit) {
     ZKMetadataProvider.setSchema(_propertyStore,
         new Schema.SchemaBuilder().setSchemaName(rawTableName)
             .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, timeUnit, TIME_COLUMN), null)
             .build());
   }
 
+  private void setSchemaDateTimeFieldSpec(String rawTableName, TimeUnit timeUnit) {
+    ZKMetadataProvider.setSchema(_propertyStore, new Schema.SchemaBuilder().setSchemaName(rawTableName)
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:" + timeUnit + ":EPOCH", "1:" + timeUnit).build());
+  }
+
   private void setSegmentZKMetadata(String rawTableName, String segment, int endTimeInDays, TimeUnit timeUnit) {
     OfflineSegmentZKMetadata offlineSegmentZKMetadata = new OfflineSegmentZKMetadata();
     offlineSegmentZKMetadata.setSegmentName(segment);
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
index e8fac87..9d29bd4 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/FieldSpecTest.java
@@ -150,7 +150,7 @@ public class FieldSpecTest {
   }
 
   /**
-   * Test {@link TimeFieldSpec} constructors.
+   * Test {@link DateTimeFieldSpec} constructors.
    */
   @Test
   public void testDateTimeFieldSpecConstructor() {
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java
index 6cf755a..b2dfe07 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/AutoAddInvertedIndex.java
@@ -42,6 +42,8 @@ import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.controller.helix.ControllerRequestURLBuilder;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -237,13 +239,13 @@ public class AutoAddInvertedIndex {
             tableNameWithType);
         continue;
       }
-      FieldSpec fieldSpec = tableSchema.getFieldSpecFor(timeColumnName);
-      if (fieldSpec == null || fieldSpec.getDataType() == FieldSpec.DataType.STRING) {
+      DateTimeFieldSpec dateTimeSpec = tableSchema.getSpecForTimeColumn(timeColumnName);
+      if (dateTimeSpec == null || dateTimeSpec.getDataType() == FieldSpec.DataType.STRING) {
         LOGGER.info("Table: {}, skip adding inverted index because it does not have a numeric time column",
             tableNameWithType);
         continue;
       }
-      TimeUnit timeUnit = ((TimeFieldSpec) fieldSpec).getOutgoingGranularitySpec().getTimeType();
+      TimeUnit timeUnit = new DateTimeFormatSpec(dateTimeSpec.getFormat()).getColumnUnit();
       if (timeUnit != TimeUnit.DAYS) {
         LOGGER.warn("Table: {}, time column {] has non-DAYS time unit: {}", timeColumnName, timeUnit);
       }
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
index fcccc72..8210523 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/PinotInstanceAssignmentRestletResourceTest.java
@@ -80,10 +80,8 @@ public class PinotInstanceAssignmentRestletResourceTest extends ControllerTest {
   @Test
   public void testInstanceAssignment()
       throws Exception {
-    Schema schema =
-        new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
-            .addTime(new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, TIME_COLUMN_NAME), null)
-            .build();
+    Schema schema = new Schema.SchemaBuilder().setSchemaName(RAW_TABLE_NAME)
+        .addDateTime(TIME_COLUMN_NAME, DataType.INT, "1:DAYS:EPOCH", "1:DAYS").build();
     _helixResourceManager.addSchema(schema, true);
     TableConfig offlineTableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setBrokerTenant(TENANT_NAME)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionEvaluatorFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionEvaluatorFactory.java
index 1c40415..aa4f8df 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionEvaluatorFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/function/FunctionEvaluatorFactory.java
@@ -55,7 +55,8 @@ public class FunctionEvaluatorFactory {
       }
     } else if (fieldSpec.getFieldType().equals(FieldSpec.FieldType.TIME)) {
 
-      // for backward compatible handling of TIME field conversion
+      // Time conversions should be done using DateTimeFieldSpec and transformFunctions
+      // But we need below lines for converting TimeFieldSpec's incoming to outgoing
       TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
       TimeGranularitySpec incomingGranularitySpec = timeFieldSpec.getIncomingGranularitySpec();
       TimeGranularitySpec outgoingGranularitySpec = timeFieldSpec.getOutgoingGranularitySpec();
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
index c6a0fd5..24a66c9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/HLRealtimeSegmentDataManager.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.core.data.manager.realtime;
 
+import com.google.common.base.Preconditions;
 import com.google.common.util.concurrent.Uninterruptibles;
 import com.yammer.metrics.core.Meter;
 import java.io.File;
@@ -46,6 +47,8 @@ import org.apache.pinot.core.segment.index.loader.IndexLoadingConfig;
 import org.apache.pinot.core.util.SchemaUtils;
 import org.apache.pinot.spi.config.table.IndexingConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.readers.GenericRow;
@@ -111,7 +114,12 @@ public class HLRealtimeSegmentDataManager extends RealtimeSegmentDataManager {
     _segmentName = realtimeSegmentZKMetadata.getSegmentName();
     _tableNameWithType = tableConfig.getTableName();
     _timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
-    _timeType = ((TimeFieldSpec) schema.getFieldSpecFor(_timeColumnName)).getOutgoingGranularitySpec().getTimeType();
+    Preconditions
+        .checkNotNull(_timeColumnName, "Must provide valid timeColumnName in tableConfig for realtime table {}",
+            _tableNameWithType);
+    DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(_timeColumnName);
+    Preconditions.checkNotNull(dateTimeFieldSpec, "Must provide field spec for time column {}", _timeColumnName);
+    _timeType = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()).getColumnUnit();
 
     List<String> sortedColumns = indexLoadingConfig.getSortedColumns();
     if (sortedColumns.isEmpty()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
index f314300..ea53ca3 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/minion/SegmentPurger.java
@@ -92,7 +92,7 @@ public class SegmentPurger {
       // The time column type info is not stored in the segment metadata.
       // Keep segment start/end time to properly handle time column type other than EPOCH (e.g.SIMPLE_FORMAT).
       if (segmentMetadata.getTimeInterval() != null) {
-        config.setTimeColumnName(segmentMetadata.getTimeColumn());
+        config.setTimeColumnName(_tableConfig.getValidationConfig().getTimeColumnName());
         config.setStartTime(Long.toString(segmentMetadata.getStartTime()));
         config.setEndTime(Long.toString(segmentMetadata.getEndTime()));
         config.setSegmentTimeUnit(segmentMetadata.getTimeUnit());
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
index 4aaf8df..12e6feb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentColumnarIndexCreator.java
@@ -362,52 +362,52 @@ public class SegmentColumnarIndexCreator implements SegmentCreator {
     properties.setProperty(DIMENSIONS, config.getDimensions());
     properties.setProperty(METRICS, config.getMetrics());
     properties.setProperty(DATETIME_COLUMNS, config.getDateTimeColumnNames());
-    properties.setProperty(TIME_COLUMN_NAME, config.getTimeColumnName());
+    String timeColumnName = config.getTimeColumnName();
+    properties.setProperty(TIME_COLUMN_NAME, timeColumnName);
     properties.setProperty(SEGMENT_TOTAL_DOCS, String.valueOf(totalDocs));
 
     // Write time related metadata (start time, end time, time unit)
-    String timeColumn = config.getTimeColumnName();
-    ColumnIndexCreationInfo timeColumnIndexCreationInfo = indexCreationInfoMap.get(timeColumn);
-    if (timeColumnIndexCreationInfo != null) {
-      long startTime;
-      long endTime;
-      TimeUnit timeUnit;
-
-      // Use start/end time in config if defined
-      if (config.getStartTime() != null) {
-        startTime = Long.parseLong(config.getStartTime());
-        endTime = Long.parseLong(config.getEndTime());
-        timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
-      } else {
-        String startTimeStr = timeColumnIndexCreationInfo.getMin().toString();
-        String endTimeStr = timeColumnIndexCreationInfo.getMax().toString();
-
-        if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
-          // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
-          DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(config.getSimpleDateFormat());
-          startTime = dateTimeFormatter.parseMillis(startTimeStr);
-          endTime = dateTimeFormatter.parseMillis(endTimeStr);
-          timeUnit = TimeUnit.MILLISECONDS;
-        } else {
-          // by default, time column type is TimeColumnType.EPOCH
-          startTime = Long.parseLong(startTimeStr);
-          endTime = Long.parseLong(endTimeStr);
+    if (timeColumnName != null) {
+      ColumnIndexCreationInfo timeColumnIndexCreationInfo = indexCreationInfoMap.get(timeColumnName);
+      if (timeColumnIndexCreationInfo != null) {
+        long startTime;
+        long endTime;
+        TimeUnit timeUnit;
+
+        // Use start/end time in config if defined
+        if (config.getStartTime() != null) {
+          startTime = Long.parseLong(config.getStartTime());
+          endTime = Long.parseLong(config.getEndTime());
           timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+        } else {
+          String startTimeStr = timeColumnIndexCreationInfo.getMin().toString();
+          String endTimeStr = timeColumnIndexCreationInfo.getMax().toString();
+
+          if (config.getTimeColumnType() == SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE) {
+            // For TimeColumnType.SIMPLE_DATE_FORMAT, convert time value into millis since epoch
+            DateTimeFormatter dateTimeFormatter = DateTimeFormat.forPattern(config.getSimpleDateFormat());
+            startTime = dateTimeFormatter.parseMillis(startTimeStr);
+            endTime = dateTimeFormatter.parseMillis(endTimeStr);
+            timeUnit = TimeUnit.MILLISECONDS;
+          } else {
+            // by default, time column type is TimeColumnType.EPOCH
+            startTime = Long.parseLong(startTimeStr);
+            endTime = Long.parseLong(endTimeStr);
+            timeUnit = Preconditions.checkNotNull(config.getSegmentTimeUnit());
+          }
         }
-      }
 
-      if (!config.isSkipTimeValueCheck()) {
-        Interval timeInterval =
-            new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime), DateTimeZone.UTC);
-        Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
-            "Invalid segment start/end time: %s (in millis: %s/%s) for time column: %s, must be between: %s",
-            timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), timeColumn,
-            TimeUtils.VALID_TIME_INTERVAL);
-      }
+        if (!config.isSkipTimeValueCheck()) {
+          Interval timeInterval = new Interval(timeUnit.toMillis(startTime), timeUnit.toMillis(endTime), DateTimeZone.UTC);
+          Preconditions.checkState(TimeUtils.isValidTimeInterval(timeInterval),
+              "Invalid segment start/end time: %s (in millis: %s/%s) for time column: %s, must be between: %s",
+              timeInterval, timeInterval.getStartMillis(), timeInterval.getEndMillis(), timeColumnName, TimeUtils.VALID_TIME_INTERVAL);
+        }
 
-      properties.setProperty(SEGMENT_START_TIME, startTime);
-      properties.setProperty(SEGMENT_END_TIME, endTime);
-      properties.setProperty(TIME_UNIT, timeUnit);
+        properties.setProperty(SEGMENT_START_TIME, startTime);
+        properties.setProperty(SEGMENT_END_TIME, endTime);
+        properties.setProperty(TIME_UNIT, timeUnit);
+      }
     }
 
     for (Map.Entry<String, String> entry : config.getCustomProperties().entrySet()) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
index 3dc60aa..524eff1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/V1Constants.java
@@ -51,6 +51,10 @@ public class V1Constants {
       public static final String TABLE_NAME = "segment.table.name";
       public static final String DIMENSIONS = "segment.dimension.column.names";
       public static final String METRICS = "segment.metric.column.names";
+      /**
+       * The primary time column for the table. This will match the timeColumnName defined in the tableConfig.
+       * In the Pinot schema, this column can be defined as either a TimeFieldSpec (which is deprecated) or DateTimeFieldSpec
+       */
       public static final String TIME_COLUMN_NAME = "segment.time.column.name";
       public static final String TIME_UNIT = "segment.time.unit";
       public static final String SEGMENT_START_TIME = "segment.start.time";
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
index 3bbdcdd..d95a3c1 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/index/metadata/SegmentMetadataImpl.java
@@ -259,7 +259,7 @@ public class SegmentMetadataImpl implements SegmentMetadata {
   private static void addPhysicalColumns(List src, Collection<String> dest) {
     for (Object o : src) {
       String column = o.toString();
-      if (!column.isEmpty() && column.charAt(0) != '$') {
+      if (!column.isEmpty() && column.charAt(0) != '$' && !dest.contains(column)) {
         dest.add(column);
       }
     }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index 1322f3d..75b619d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -25,7 +25,8 @@ import java.util.Date;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
 import javax.annotation.Nullable;
-import org.apache.pinot.spi.data.TimeGranularitySpec.TimeFormat;
+import org.apache.pinot.spi.data.DateTimeFieldSpec.TimeFormat;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 
 
 /**
@@ -44,8 +45,8 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
   private SimpleDateFormat _inputSDF;
 
   public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix,
-      boolean excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency, @Nullable TimeUnit timeType,
-      @Nullable String timeFormat) {
+      boolean excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency, @Nullable
+      DateTimeFormatSpec dateTimeFormatSpec) {
     _segmentNamePrefix = segmentNamePrefix != null ? segmentNamePrefix.trim() : tableName;
     _excludeSequenceId = excludeSequenceId;
     _appendPushType = "APPEND".equalsIgnoreCase(pushType);
@@ -60,16 +61,16 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
       }
       _outputSDF.setTimeZone(TimeZone.getTimeZone("UTC"));
 
-      // Parse input time format: 'EPOCH' or 'SIMPLE_DATE_FORMAT:<pattern>'
-      if (Preconditions.checkNotNull(timeFormat).equals(TimeFormat.EPOCH.toString())) {
-        _inputTimeUnit = timeType;
+      // Parse input time format: 'EPOCH' or 'SIMPLE_DATE_FORMAT' using pattern
+      Preconditions.checkNotNull(dateTimeFormatSpec);
+      TimeFormat timeFormat = dateTimeFormatSpec.getTimeFormat();
+      if (timeFormat == TimeFormat.EPOCH) {
+        _inputTimeUnit = dateTimeFormatSpec.getColumnUnit();
         _inputSDF = null;
       } else {
-        Preconditions.checkArgument(timeFormat.startsWith(TimeFormat.SIMPLE_DATE_FORMAT.toString()),
-            "Invalid time format: %s, must be one of '%s' or '%s:<pattern>'", timeFormat, TimeFormat.EPOCH,
-            TimeFormat.SIMPLE_DATE_FORMAT);
+        Preconditions.checkNotNull(dateTimeFormatSpec.getSDFPattern(), "Must provide pattern for SIMPLE_DATE_FORMAT");
         _inputTimeUnit = null;
-        _inputSDF = new SimpleDateFormat(timeFormat.substring(timeFormat.indexOf(':') + 1));
+        _inputSDF = new SimpleDateFormat(dateTimeFormatSpec.getSDFPattern());
         _inputSDF.setTimeZone(TimeZone.getTimeZone("UTC"));
       }
     } else {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReaderTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReaderTest.java
index 0dd0e39..a1749ae 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReaderTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/MultiplePinotSegmentRecordReaderTest.java
@@ -53,7 +53,8 @@ public class MultiplePinotSegmentRecordReaderTest {
   private static String D_MV_1 = "d_mv_1";
   private static String M1 = "m1";
   private static String M2 = "m2";
-  private static String TIME = "t";
+  private static String TIME_1 = "t1";
+  private static String TIME_2 = "t2";
 
   private String _segmentOutputDir;
   private List<File> _segmentIndexDirList;
@@ -86,12 +87,13 @@ public class MultiplePinotSegmentRecordReaderTest {
         .addMultiValueDimension(D_MV_1, FieldSpec.DataType.STRING)
         .addMetric(M1, FieldSpec.DataType.INT)
         .addMetric(M2, FieldSpec.DataType.FLOAT)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.HOURS, TIME), null)
+        .addDateTime(TIME_1, FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS")
+        .addDateTime(TIME_2, FieldSpec.DataType.LONG, "1:DAYS:EPOCH", "1:DAYS")
         .build();
   }
 
   private TableConfig createTableConfig() {
-    return new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME).build();
+    return new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_1).build();
   }
 
   @Test
@@ -117,7 +119,8 @@ public class MultiplePinotSegmentRecordReaderTest {
         Assert.assertTrue(PinotSegmentUtil.compareMultiValueColumn(outputRow.getValue(D_MV_1), row.getValue(D_MV_1)));
         Assert.assertEquals(outputRow.getValue(M1), row.getValue(M1));
         Assert.assertEquals(outputRow.getValue(M2), row.getValue(M2));
-        Assert.assertEquals(outputRow.getValue(TIME), row.getValue(TIME));
+        Assert.assertEquals(outputRow.getValue(TIME_1), row.getValue(TIME_1));
+        Assert.assertEquals(outputRow.getValue(TIME_2), row.getValue(TIME_2));
       }
     }
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java
index c40cb7c..c7ee734 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/data/readers/PinotSegmentUtil.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -106,30 +108,11 @@ public class PinotSegmentUtil {
       // segment generation code doesn't throw exception
       TimeFieldSpec timeFieldSpec = (TimeFieldSpec)fieldSpec;
       TimeUnit unit = timeFieldSpec.getIncomingGranularitySpec().getTimeType();
-      final long milliMin = TimeUtils.getValidMinTimeMillis();
-      final long milliMax = TimeUtils.getValidMaxTimeMillis();
-      final long daysMin = TimeUnit.DAYS.convert(milliMin, TimeUnit.MILLISECONDS);
-      final long daysMax = TimeUnit.DAYS.convert(milliMax, TimeUnit.MILLISECONDS);
-      final long hoursMin = TimeUnit.HOURS.convert(milliMin, TimeUnit.MILLISECONDS);
-      final long hoursMax = TimeUnit.HOURS.convert(milliMax, TimeUnit.MILLISECONDS);
-      final long minutesMin = TimeUnit.MINUTES.convert(milliMin, TimeUnit.MILLISECONDS);
-      final long minutesMax = TimeUnit.MINUTES.convert(milliMax, TimeUnit.MILLISECONDS);
-      switch (unit) {
-        case MILLISECONDS:
-          return random.nextLong(milliMin, milliMax);
-        case SECONDS:
-          return random.nextLong(milliMin/1000, milliMax/1000);
-        case MICROSECONDS:
-          return random.nextLong(milliMin*1000, milliMax*1000);
-        case NANOSECONDS:
-          return random.nextLong(milliMin*1000*1000, milliMax*1000*1000);
-        case DAYS:
-          return random.nextLong(daysMin, daysMax);
-        case HOURS:
-          return random.nextLong(hoursMin, hoursMax);
-        case MINUTES:
-          return random.nextLong(minutesMin, minutesMax);
-      }
+      return generateTimeValue(random, unit);
+    } else if (fieldSpec instanceof DateTimeFieldSpec) {
+      DateTimeFieldSpec dateTimeFieldSpec = (DateTimeFieldSpec) fieldSpec;
+      TimeUnit unit = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat()).getColumnUnit();
+      return generateTimeValue(random, unit);
     } else {
       switch (fieldSpec.getDataType()) {
         case INT:
@@ -148,6 +131,34 @@ public class PinotSegmentUtil {
     throw new IllegalStateException("Illegal data type");
   }
 
+  private static Object generateTimeValue(ThreadLocalRandom random, TimeUnit unit) {
+    final long milliMin = TimeUtils.getValidMinTimeMillis();
+    final long milliMax = TimeUtils.getValidMaxTimeMillis();
+    final long daysMin = TimeUnit.DAYS.convert(milliMin, TimeUnit.MILLISECONDS);
+    final long daysMax = TimeUnit.DAYS.convert(milliMax, TimeUnit.MILLISECONDS);
+    final long hoursMin = TimeUnit.HOURS.convert(milliMin, TimeUnit.MILLISECONDS);
+    final long hoursMax = TimeUnit.HOURS.convert(milliMax, TimeUnit.MILLISECONDS);
+    final long minutesMin = TimeUnit.MINUTES.convert(milliMin, TimeUnit.MILLISECONDS);
+    final long minutesMax = TimeUnit.MINUTES.convert(milliMax, TimeUnit.MILLISECONDS);
+    switch (unit) {
+      case MILLISECONDS:
+        return random.nextLong(milliMin, milliMax);
+      case SECONDS:
+        return random.nextLong(milliMin/1000, milliMax/1000);
+      case MICROSECONDS:
+        return random.nextLong(milliMin*1000, milliMax*1000);
+      case NANOSECONDS:
+        return random.nextLong(milliMin*1000*1000, milliMax*1000*1000);
+      case DAYS:
+        return random.nextLong(daysMin, daysMax);
+      case HOURS:
+        return random.nextLong(hoursMin, hoursMax);
+      case MINUTES:
+        return random.nextLong(minutesMin, minutesMax);
+    }
+    throw new IllegalStateException("Illegal data type");
+  }
+
   private static Object[] generateMultiValue(ThreadLocalRandom random, FieldSpec fieldSpec) {
     Object[] value = new Object[DEFAULT_NUM_MULTIVALUE];
     for (int i = 0; i < DEFAULT_NUM_MULTIVALUE; i++) {
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
index 05e8a7d..701d15b 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfigTest.java
@@ -47,12 +47,21 @@ public class SegmentGeneratorConfigTest {
     assertNull(segmentGeneratorConfig.getSimpleDateFormat());
 
     // MUST provide valid tableConfig with time column if time details are wanted
-    tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
     segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
     assertNull(segmentGeneratorConfig.getTimeColumnName());
     assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
     assertNull(segmentGeneratorConfig.getSimpleDateFormat());
+
+    schema = new Schema.SchemaBuilder().addDateTime("daysSinceEpoch", FieldSpec.DataType.INT, "1:DAYS:EPOCH", "1:DAYS")
+        .build();
+    tableConfig =
+        new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("daysSinceEpoch").build();
+    segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    assertEquals(segmentGeneratorConfig.getTimeColumnName(), "daysSinceEpoch");
+    assertEquals(segmentGeneratorConfig.getTimeColumnType(), SegmentGeneratorConfig.TimeColumnType.EPOCH);
+    assertEquals(segmentGeneratorConfig.getSegmentTimeUnit(), TimeUnit.DAYS);
+    assertNull(segmentGeneratorConfig.getSimpleDateFormat());
   }
 
   @Test
@@ -69,11 +78,19 @@ public class SegmentGeneratorConfigTest {
     assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd");
 
     // MUST provide valid tableConfig with time column if time details are wanted
-    tableConfig =
-        new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").build();
     segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
     assertNull(segmentGeneratorConfig.getTimeColumnName());
     assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
     assertNull(segmentGeneratorConfig.getSimpleDateFormat());
+
+    schema = new Schema.SchemaBuilder()
+        .addDateTime("Date", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "1:DAYS").build();
+    tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName("Date").build();
+    segmentGeneratorConfig = new SegmentGeneratorConfig(tableConfig, schema);
+    assertEquals(segmentGeneratorConfig.getTimeColumnName(), "Date");
+    assertEquals(segmentGeneratorConfig.getTimeColumnType(), SegmentGeneratorConfig.TimeColumnType.SIMPLE_DATE);
+    assertNull(segmentGeneratorConfig.getSegmentTimeUnit());
+    assertEquals(segmentGeneratorConfig.getSimpleDateFormat(), "yyyyMMdd");
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
index 580e284..1586a7a 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/indexsegment/mutable/MutableSegmentImplAggregateMetricsTest.java
@@ -44,19 +44,20 @@ public class MutableSegmentImplAggregateMetricsTest {
   private static final String DIMENSION_2 = "dim2";
   private static final String METRIC = "metric";
   private static final String METRIC_2 = "metric2";
-  private static final String TIME_COLUMN = "time";
+  private static final String TIME_COLUMN1 = "time1";
+  private static final String TIME_COLUMN2 = "time2";
   private static final String KEY_SEPARATOR = "\t\t";
   private static final int NUM_ROWS = 10001;
 
-  private MutableSegmentImpl _mutableSegmentImpl;
 
-  @BeforeClass
-  public void setUp() {
+  @Test
+  public void testAggregateMetrics() {
     Schema schema = new Schema.SchemaBuilder().setSchemaName("testSchema")
         .addSingleValueDimension(DIMENSION_1, FieldSpec.DataType.INT)
         .addSingleValueDimension(DIMENSION_2, FieldSpec.DataType.STRING).addMetric(METRIC, FieldSpec.DataType.LONG)
         .addMetric(METRIC_2, FieldSpec.DataType.FLOAT)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, TIME_COLUMN), null)
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, TIME_COLUMN1), null)
+        .addDateTime(TIME_COLUMN2, FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS")
         .build();
     // Add virtual columns, which should not be aggregated
     DimensionFieldSpec virtualDimensionFieldSpec =
@@ -65,15 +66,33 @@ public class MutableSegmentImplAggregateMetricsTest {
     MetricFieldSpec virtualMetricFieldSpec = new MetricFieldSpec("$virtualMetric", FieldSpec.DataType.INT);
     virtualMetricFieldSpec.setVirtualColumnProvider("provider.class");
     schema.addField(virtualMetricFieldSpec);
+    MutableSegmentImpl mutableSegmentImpl = MutableSegmentImplTestUtils
+        .createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(METRIC, METRIC_2)),
+            Collections.singleton(DIMENSION_2),
+            new HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN1, TIME_COLUMN2)), true);
+    testAggregateMetrics(mutableSegmentImpl);
+    mutableSegmentImpl.destroy();
+
 
-    _mutableSegmentImpl = MutableSegmentImplTestUtils
+    schema = new Schema.SchemaBuilder().setSchemaName("testSchema")
+        .addSingleValueDimension(DIMENSION_1, FieldSpec.DataType.INT)
+        .addSingleValueDimension(DIMENSION_2, FieldSpec.DataType.STRING).addMetric(METRIC, FieldSpec.DataType.LONG)
+        .addMetric(METRIC_2, FieldSpec.DataType.FLOAT)
+        .addDateTime(TIME_COLUMN1, FieldSpec.DataType.INT, "1:DAYS:EPOCH", "1:DAYS")
+        .addDateTime(TIME_COLUMN2, FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS")
+        .build();
+    // Add virtual columns, which should not be aggregated
+    schema.addField(virtualDimensionFieldSpec);
+    schema.addField(virtualMetricFieldSpec);
+    mutableSegmentImpl = MutableSegmentImplTestUtils
         .createMutableSegmentImpl(schema, new HashSet<>(Arrays.asList(METRIC, METRIC_2)),
-            Collections.singleton(DIMENSION_2), new HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN)),
-            true);
+            Collections.singleton(DIMENSION_2),
+            new HashSet<>(Arrays.asList(DIMENSION_1, DIMENSION_2, TIME_COLUMN1, TIME_COLUMN2)), true);
+    testAggregateMetrics(mutableSegmentImpl);
+    mutableSegmentImpl.destroy();
   }
 
-  @Test
-  public void testAggregateMetrics() {
+  private void testAggregateMetrics(MutableSegmentImpl mutableSegmentImpl) {
     String[] stringValues = new String[10];
     Float[] floatValues = new Float[10];
     Random random = new Random();
@@ -86,18 +105,20 @@ public class MutableSegmentImplAggregateMetricsTest {
     Map<String, Float> expectedValuesFloat = new HashMap<>();
     StreamMessageMetadata defaultMetadata = new StreamMessageMetadata(System.currentTimeMillis());
     for (int i = 0; i < NUM_ROWS; i++) {
-      int daysSinceEpoch = random.nextInt(10);
+      int hoursSinceEpoch = random.nextInt(10);
+      int daysSinceEpoch = random.nextInt(5);
       GenericRow row = new GenericRow();
       row.putField(DIMENSION_1, random.nextInt(10));
       row.putField(DIMENSION_2, stringValues[random.nextInt(stringValues.length)]);
-      row.putField(TIME_COLUMN, daysSinceEpoch);
+      row.putField(TIME_COLUMN1, daysSinceEpoch);
+      row.putField(TIME_COLUMN2, hoursSinceEpoch);
       // Generate random int to prevent overflow
       long metricValue = random.nextInt();
       row.putField(METRIC, metricValue);
       float metricValueFloat = floatValues[random.nextInt(floatValues.length)];
       row.putField(METRIC_2, metricValueFloat);
 
-      _mutableSegmentImpl.index(row, defaultMetadata);
+      mutableSegmentImpl.index(row, defaultMetadata);
 
       // Update expected values
       String key = buildKey(row);
@@ -105,7 +126,7 @@ public class MutableSegmentImplAggregateMetricsTest {
       expectedValuesFloat.put(key, expectedValuesFloat.getOrDefault(key, 0f) + metricValueFloat);
     }
 
-    int numDocsIndexed = _mutableSegmentImpl.getNumDocsIndexed();
+    int numDocsIndexed = mutableSegmentImpl.getNumDocsIndexed();
     Assert.assertEquals(numDocsIndexed, expectedValues.size());
 
     // Assert that aggregation happened.
@@ -113,7 +134,7 @@ public class MutableSegmentImplAggregateMetricsTest {
 
     GenericRow reuse = new GenericRow();
     for (int docId = 0; docId < numDocsIndexed; docId++) {
-      GenericRow row = _mutableSegmentImpl.getRecord(docId, reuse);
+      GenericRow row = mutableSegmentImpl.getRecord(docId, reuse);
       String key = buildKey(row);
       Assert.assertEquals(row.getValue(METRIC), expectedValues.get(key));
       Assert.assertEquals(row.getValue(METRIC_2), expectedValuesFloat.get(key));
@@ -122,11 +143,6 @@ public class MutableSegmentImplAggregateMetricsTest {
 
   private String buildKey(GenericRow row) {
     return row.getValue(DIMENSION_1) + KEY_SEPARATOR + row.getValue(DIMENSION_2) + KEY_SEPARATOR + row
-        .getValue(TIME_COLUMN);
-  }
-
-  @AfterClass
-  public void tearDown() {
-    _mutableSegmentImpl.destroy();
+        .getValue(TIME_COLUMN1) + KEY_SEPARATOR + row.getValue(TIME_COLUMN2);
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/DateTruncTransformFunctionTest.java b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/DateTruncTransformFunctionTest.java
index 3da6fca..f60b7d9 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/DateTruncTransformFunctionTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/operator/transform/function/DateTruncTransformFunctionTest.java
@@ -79,13 +79,12 @@ public class DateTruncTransformFunctionTest
     return formatter.parseDateTime(iso8601).getMillis();
   }
 
-  private static void testDateTruncHelper(String literalInput, String unit, String tz, long expected) throws Exception {
+  private static void testDateTruncHelper(Schema schema, String literalInput, String unit, String tz, long expected)
+      throws Exception {
     long zmillisInput = iso8601ToUtcEpochMillis(literalInput);
     GenericRow row = new GenericRow();
     row.init(ImmutableMap.of(TIME_COLUMN, zmillisInput));
     List<GenericRow> rows = ImmutableList.of(row);
-    Schema schema = new Schema.SchemaBuilder()
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COLUMN).build();
 
@@ -122,57 +121,70 @@ public class DateTruncTransformFunctionTest
     }
   }
 
+
+
   @Test
   public void testPrestoCompatibleDateTimeConversionTransformFunction() throws Exception {
+    Schema schemaTimeFieldSpec = new Schema.SchemaBuilder()
+        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN), null).build();
+    testDateTrunc(schemaTimeFieldSpec);
+
+    Schema schemaDateTimeFieldSpec = new Schema.SchemaBuilder()
+        .addDateTime(TIME_COLUMN, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
+    testDateTrunc(schemaDateTimeFieldSpec);
+  }
+
+  private void testDateTrunc(Schema schema) throws Exception {
+
     DateTime result = TIMESTAMP;
     result = result.withMillisOfSecond(0);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "second", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "second", UTC_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withSecondOfMinute(0);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "minute", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "minute", UTC_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withMinuteOfHour(0);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "hour", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "hour", UTC_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withHourOfDay(0);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "day", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "day", UTC_TIME_ZONE.getID(), result.getMillis());
 
     // ISO8601 week begins on Monday. For this timestamp (2001-08-22), 20th is the Monday of that week
     result = result.withDayOfMonth(20);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "week", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "week", UTC_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withDayOfMonth(1);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "month", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "month", UTC_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withMonthOfYear(7);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "quarter", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "quarter", UTC_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withMonthOfYear(1);
-    testDateTruncHelper(TIMESTAMP_ISO8601_STRING, "year", UTC_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, TIMESTAMP_ISO8601_STRING, "year", UTC_TIME_ZONE.getID(), result.getMillis());
 
     result = WEIRD_TIMESTAMP;
     result = result.withMillisOfSecond(0);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "second", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "second", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withSecondOfMinute(0);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "minute", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "minute", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withMinuteOfHour(0);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "hour", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "hour", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withHourOfDay(0);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "day", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "day", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withDayOfMonth(20);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "week", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "week", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withDayOfMonth(1);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "month", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "month", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withMonthOfYear(7);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "quarter", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "quarter", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
 
     result = result.withMonthOfYear(1);
-    testDateTruncHelper(WEIRD_TIMESTAMP_ISO8601_STRING, "year", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
+    testDateTruncHelper(schema, WEIRD_TIMESTAMP_ISO8601_STRING, "year", WEIRD_DATE_TIME_ZONE.getID(), result.getMillis());
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
index c6bd73d..13d2088 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/plan/maker/MetadataAndDictionaryAggregationPlanMakerTest.java
@@ -84,7 +84,7 @@ public class MetadataAndDictionaryAggregationPlanMakerTest {
         .addSingleValueDimension("column11", FieldSpec.DataType.STRING)
         .addSingleValueDimension("column12", FieldSpec.DataType.STRING).addMetric("column17", FieldSpec.DataType.INT)
         .addMetric("column18", FieldSpec.DataType.INT)
-        .addTime(new TimeGranularitySpec(DataType.INT, 1, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
+        .addTime(new TimeGranularitySpec(DataType.INT, TimeUnit.DAYS, "daysSinceEpoch"), null).build();
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.OFFLINE).setTableName("testTable").setTimeColumnName("daysSinceEpoch").build();
 
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java
index b242660..645e676 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/index/creator/SegmentGenerationWithTimeColumnTest.java
@@ -94,6 +94,19 @@ public class SegmentGenerationWithTimeColumnTest {
     Assert.assertEquals(metadata.getEndTime(), sdfToMillis(maxTime));
   }
 
+  /**
+   * Tests using DateTimeFieldSpec as time column
+   */
+  @Test
+  public void testSimpleDateSegmentGenerationNew()
+      throws Exception {
+    Schema schema = createDateTimeFieldSpecSchema(true);
+    File segmentDir = buildSegment(_tableConfig, schema, true, false);
+    SegmentMetadataImpl metadata = SegmentDirectory.loadSegmentMetadata(segmentDir);
+    Assert.assertEquals(metadata.getStartTime(), sdfToMillis(minTime));
+    Assert.assertEquals(metadata.getEndTime(), sdfToMillis(maxTime));
+  }
+
   @Test
   public void testEpochDateSegmentGeneration()
       throws Exception {
@@ -104,6 +117,20 @@ public class SegmentGenerationWithTimeColumnTest {
     Assert.assertEquals(metadata.getEndTime(), maxTime);
   }
 
+  /**
+   * Tests using DateTimeFieldSpec as time column
+   */
+  @Test
+  public void testEpochDateSegmentGenerationNew()
+      throws Exception {
+    Schema schema = createDateTimeFieldSpecSchema(false);
+    File segmentDir = buildSegment(_tableConfig, schema, false, false);
+    SegmentMetadataImpl metadata = SegmentDirectory.loadSegmentMetadata(segmentDir);
+    Assert.assertEquals(metadata.getStartTime(), minTime);
+    Assert.assertEquals(metadata.getEndTime(), maxTime);
+  }
+
+
   @Test(expectedExceptions = IllegalStateException.class)
   public void testSegmentGenerationWithInvalidTime()
       throws Exception {
@@ -111,6 +138,16 @@ public class SegmentGenerationWithTimeColumnTest {
     buildSegment(_tableConfig, schema, false, true);
   }
 
+  /**
+   * Tests using DateTimeFieldSpec as time column
+   */
+  @Test(expectedExceptions = IllegalStateException.class)
+  public void testSegmentGenerationWithInvalidTimeNew()
+      throws Exception {
+    Schema schema = createDateTimeFieldSpecSchema(false);
+    buildSegment(_tableConfig, schema, false, true);
+  }
+
   private Schema createSchema(boolean isSimpleDate) {
     Schema.SchemaBuilder builder =
         new Schema.SchemaBuilder().addSingleValueDimension(STRING_COL_NAME, FieldSpec.DataType.STRING);
@@ -123,6 +160,18 @@ public class SegmentGenerationWithTimeColumnTest {
     return builder.build();
   }
 
+  private Schema createDateTimeFieldSpecSchema(boolean isSimpleDate) {
+    Schema.SchemaBuilder builder =
+        new Schema.SchemaBuilder().addSingleValueDimension(STRING_COL_NAME, FieldSpec.DataType.STRING);
+    if (isSimpleDate) {
+      builder.addDateTime(TIME_COL_NAME, FieldSpec.DataType.INT, "1:DAYS:SIMPLE_DATE_FORMAT:"+TIME_COL_FORMAT, "1:DAYS");
+    } else {
+      builder.addDateTime(TIME_COL_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS");
+    }
+    builder.addDateTime("hoursSinceEpoch", FieldSpec.DataType.INT, "1:HOURS:EPOCH", "1:HOURS");
+    return builder.build();
+  }
+
   private TableConfig createTableConfig() {
     return new TableConfigBuilder(TableType.OFFLINE).setTableName("test").setTimeColumnName(TIME_COL_NAME).build();
   }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
index 9706648..8b0e376 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
@@ -19,6 +19,8 @@
 package org.apache.pinot.core.segment.name;
 
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -30,8 +32,9 @@ public class NormalizedDateSegmentNameGeneratorTest {
   private static final String APPEND_PUSH_TYPE = "APPEND";
   private static final String REFRESH_PUSH_TYPE = "REFRESH";
   private static final String EPOCH_TIME_FORMAT = "EPOCH";
-  private static final String LONG_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
-  private static final String STRING_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyy-MM-dd";
+  private static final String SIMPLE_DATE_TIME_FORMAT = "SIMPLE_DATE_FORMAT";
+  private static final String LONG_SIMPLE_DATE_FORMAT = "yyyyMMdd";
+  private static final String STRING_SIMPLE_DATE_FORMAT = "yyyy-MM-dd";
   private static final String DAILY_PUSH_FREQUENCY = "daily";
   private static final String HOURLY_PUSH_FREQUENCY = "hourly";
   private static final int INVALID_SEQUENCE_ID = -1;
@@ -40,7 +43,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   @Test
   public void testRefresh() {
     SegmentNameGenerator segmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, REFRESH_PUSH_TYPE, null, null, null);
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, REFRESH_PUSH_TYPE, null, null);
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
@@ -50,8 +53,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   @Test
   public void testWithSegmentNamePrefix() {
     SegmentNameGenerator segmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, false, REFRESH_PUSH_TYPE, null, null,
-            null);
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, false, REFRESH_PUSH_TYPE, null, null);
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
@@ -62,7 +64,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   public void testWithUntrimmedSegmentNamePrefix() {
     SegmentNameGenerator segmentNameGenerator =
         new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX + "  ", false, REFRESH_PUSH_TYPE, null,
-            null, null);
+            null);
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
@@ -72,7 +74,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   @Test
   public void testExcludeSequenceId() {
     SegmentNameGenerator segmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, true, REFRESH_PUSH_TYPE, null, null, null);
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, true, REFRESH_PUSH_TYPE, null, null);
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false, excludeSequenceId=true");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
@@ -82,8 +84,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   @Test
   public void testWithPrefixExcludeSequenceId() {
     SegmentNameGenerator segmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, true, REFRESH_PUSH_TYPE, null, null,
-            null);
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, true, REFRESH_PUSH_TYPE, null, null);
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false, excludeSequenceId=true");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
@@ -94,7 +95,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   public void testAppend() {
     SegmentNameGenerator segmentNameGenerator =
         new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
-            TimeUnit.DAYS, EPOCH_TIME_FORMAT);
+            new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), EPOCH_TIME_FORMAT));
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=DAYS");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
@@ -107,7 +108,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   public void testHoursTimeType() {
     SegmentNameGenerator segmentNameGenerator =
         new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
-            TimeUnit.HOURS, EPOCH_TIME_FORMAT);
+            new DateTimeFormatSpec(1, TimeUnit.HOURS.toString(), EPOCH_TIME_FORMAT));
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=HOURS");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 24L, 72L),
@@ -120,7 +121,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   public void testLongSimpleDateFormat() {
     SegmentNameGenerator segmentNameGenerator =
         new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
-            TimeUnit.DAYS, LONG_SIMPLE_DATE_FORMAT);
+            new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), SIMPLE_DATE_TIME_FORMAT, LONG_SIMPLE_DATE_FORMAT));
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyyMMdd");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 19700102L, 19700104L),
@@ -133,7 +134,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   public void testStringSimpleDateFormat() {
     SegmentNameGenerator segmentNameGenerator =
         new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
-            TimeUnit.DAYS, STRING_SIMPLE_DATE_FORMAT);
+            new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), SIMPLE_DATE_TIME_FORMAT, STRING_SIMPLE_DATE_FORMAT));
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyy-MM-dd");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, "1970-01-02", "1970-01-04"),
@@ -146,7 +147,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
   public void testHourlyPushFrequency() {
     SegmentNameGenerator segmentNameGenerator =
         new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, HOURLY_PUSH_FREQUENCY,
-            TimeUnit.DAYS, EPOCH_TIME_FORMAT);
+            new DateTimeFormatSpec(1, TimeUnit.DAYS.toString(), EPOCH_TIME_FORMAT));
     assertEquals(segmentNameGenerator.toString(),
         "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd-HH, inputTimeUnit=DAYS");
     assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
index 9f434e4..1b9c060 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/util/SchemaUtilsTest.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -110,6 +111,16 @@ public class SchemaUtilsTest {
     extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
     Assert.assertEquals(extract.size(), 2);
     Assert.assertTrue(extract.containsAll(Lists.newArrayList("tenMinutesSinceEpoch", "timestamp")));
+
+    // inbuilt functions on DateTimeFieldSpec
+    schema = new Schema();
+    DateTimeFieldSpec dateTimeFieldSpec =
+        new DateTimeFieldSpec("date", FieldSpec.DataType.STRING, "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd", "1:DAYS");
+    dateTimeFieldSpec.setTransformFunction("toDateTime(timestamp, 'yyyy-MM-dd')");
+    schema.addField(dateTimeFieldSpec);
+    extract = new ArrayList<>(SchemaUtils.extractSourceFields(schema));
+    Assert.assertEquals(extract.size(), 2);
+    Assert.assertTrue(extract.containsAll(Lists.newArrayList("date", "timestamp")));
   }
 
   @Test
@@ -128,6 +139,12 @@ public class SchemaUtilsTest {
     pinotSchema.addField(metricFieldSpec);
     Assert.assertFalse(SchemaUtils.validate(pinotSchema));
 
+    pinotSchema = new Schema();
+    DateTimeFieldSpec dateTimeFieldSpec = new DateTimeFieldSpec("dt1", FieldSpec.DataType.LONG, "1:HOURS:EPOCH", "1:HOURS");
+    dateTimeFieldSpec.setTransformFunction("Groovy({function}, m1, dt1)");
+    pinotSchema.addField(dateTimeFieldSpec);
+    Assert.assertFalse(SchemaUtils.validate(pinotSchema));
+
     pinotSchema = new Schema.SchemaBuilder()
         .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, "time"), null).build();
     pinotSchema.getFieldSpecFor("time").setTransformFunction("Groovy({function}, time)");
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DateTimeFieldSpecHybridClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DateTimeFieldSpecHybridClusterIntegrationTest.java
new file mode 100644
index 0000000..22e2589
--- /dev/null
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/DateTimeFieldSpecHybridClusterIntegrationTest.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+
+
+/**
+ * Hybrid cluster integration test that uses one of the DateTimeFieldSpec as primary time column
+ */
+public class DateTimeFieldSpecHybridClusterIntegrationTest extends HybridClusterIntegrationTest {
+  private static final String SCHEMA_WITH_DATETIME_FIELDSPEC_NAME = "On_Time_On_Time_Performance_2014_100k_subset_nonulls_datetimefieldspecs.schema";
+
+  protected String getSchemaFileName() {
+    return SCHEMA_WITH_DATETIME_FIELDSPEC_NAME;
+  }
+
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    super.setUp();
+  }
+
+
+  @AfterClass
+  public void tearDown()
+      throws Exception {
+    super.tearDown();
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
index 6f7b408..572737f 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/LuceneRealtimeClusterIntegrationTest.java
@@ -126,8 +126,7 @@ public class LuceneRealtimeClusterIntegrationTest extends BaseClusterIntegration
     // Create and upload the schema and table config
     Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME)
         .addSingleValueDimension(TEXT_COLUMN_NAME, FieldSpec.DataType.STRING)
-        .addTime(new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.MILLISECONDS, TIME_COLUMN_NAME), null)
-        .build();
+        .addDateTime(TIME_COLUMN_NAME, FieldSpec.DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
     addSchema(schema);
     addTableConfig(createRealtimeTableConfig(avroFile));
 
diff --git a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_datetimefieldspecs.schema
similarity index 82%
copy from pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema
copy to pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_datetimefieldspecs.schema
index 6ac1f58..9c46510 100644
--- a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema
+++ b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_datetimefieldspecs.schema
@@ -162,10 +162,6 @@
       "dataType": "INT"
     },
     {
-      "name": "FlightDate",
-      "dataType": "STRING"
-    },
-    {
       "name": "FlightNum",
       "dataType": "INT"
     },
@@ -239,10 +235,6 @@
       "dataType": "INT"
     },
     {
-      "name": "Year",
-      "dataType": "INT"
-    },
-    {
       "name": "WheelsOn",
       "dataType": "INT"
     },
@@ -257,32 +249,6 @@
     {
       "name": "TotalAddGTime",
       "dataType": "INT"
-    },
-    {
-      "name": "NewAddedIntDimension",
-      "dataType": "INT"
-    },
-    {
-      "name": "NewAddedLongDimension",
-      "dataType": "LONG"
-    },
-    {
-      "name": "NewAddedFloatDimension",
-      "dataType": "FLOAT"
-    },
-    {
-      "name": "NewAddedDoubleDimension",
-      "dataType": "DOUBLE"
-    },
-    {
-      "name": "NewAddedSVStringDimension",
-      "dataType": "STRING"
-    },
-    {
-      "name": "NewAddedMVStringDimension",
-      "dataType": "STRING",
-      "singleValueField": false,
-      "defaultNullValue": ""
     }
   ],
   "metricFieldSpecs": [
@@ -349,31 +315,24 @@
     {
       "name": "WeatherDelay",
       "dataType": "INT"
-    },
-    {
-      "name": "NewAddedIntMetric",
-      "dataType": "INT",
-      "defaultNullValue": 1
-    },
-    {
-      "name": "NewAddedLongMetric",
-      "dataType": "LONG",
-      "defaultNullValue": 1
-    },
-    {
-      "name": "NewAddedFloatMetric",
-      "dataType": "FLOAT"
-    },
-    {
-      "name": "NewAddedDoubleMetric",
-      "dataType": "DOUBLE"
     }
   ],
-  "timeFieldSpec": {
-    "incomingGranularitySpec": {
-      "name": "DaysSinceEpoch",
-      "dataType": "INT",
-      "timeType": "DAYS"
-    }
-  }
+  "dateTimeFieldSpecs": [{
+    "name": "DaysSinceEpoch",
+    "dataType": "INT",
+    "format": "1:DAYS:EPOCH",
+    "granularity": "1:DAYS"
+  },
+  {
+    "name": "Year",
+    "dataType": "INT",
+    "format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy",
+    "granularity": "1:DAYS"
+  },
+  {
+    "name": "FlightDate",
+    "dataType": "STRING",
+    "format": "1:DAYS:SIMPLE_DATE_FORMAT:yyyy-MM-dd",
+    "granularity": "1:DAYS"
+  }]
 }
\ No newline at end of file
diff --git a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema
index 6ac1f58..d1eafdd 100644
--- a/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema
+++ b/pinot-integration-tests/src/test/resources/On_Time_On_Time_Performance_2014_100k_subset_nonulls_default_column_test_extra_columns.schema
@@ -376,4 +376,4 @@
       "timeType": "DAYS"
     }
   }
-}
\ No newline at end of file
+}
diff --git a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
index 4ba5156..b84a299 100644
--- a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
+++ b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/SegmentGenerationTaskRunner.java
@@ -33,6 +33,8 @@ import org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -123,25 +125,19 @@ public class SegmentGenerationTaskRunner implements Serializable {
       case SIMPLE_SEGMENT_NAME_GENERATOR:
         return new SimpleSegmentNameGenerator(tableName, segmentNameGeneratorConfigs.get(SEGMENT_NAME_POSTFIX));
       case NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
-        Preconditions.checkState(tableConfig != null,
-            "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
         SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
-        String timeFormat = null;
-        TimeUnit timeType = null;
+        DateTimeFormatSpec dateTimeFormatSpec = null;
         String timeColumnName = tableConfig.getValidationConfig().getTimeColumnName();
 
         if (timeColumnName != null) {
-          FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
-          if (fieldSpec != null) {
-            TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-            timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
-            timeType = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
+          DateTimeFieldSpec dateTimeFieldSpec = schema.getSpecForTimeColumn(timeColumnName);
+          if (dateTimeFieldSpec != null) {
+            dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
           }
         }
         return new NormalizedDateSegmentNameGenerator(tableName, segmentNameGeneratorConfigs.get(SEGMENT_NAME_PREFIX),
-            Boolean.valueOf(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)),
-            validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(),
-            timeType, timeFormat);
+            Boolean.parseBoolean(segmentNameGeneratorConfigs.get(EXCLUDE_SEQUENCE_ID)),
+            validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(), dateTimeFormatSpec);
       default:
         throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
     }
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
index 0175b37..45dca67 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/HadoopSegmentPreprocessingJob.java
@@ -63,6 +63,9 @@ import org.apache.pinot.spi.config.table.SegmentPartitionConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.config.table.TableCustomConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatPatternSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.TimeFieldSpec;
 import org.apache.pinot.spi.data.TimeGranularitySpec;
@@ -373,14 +376,15 @@ public class HadoopSegmentPreprocessingJob extends SegmentPreprocessingJob {
       String timeColumnName = validationConfig.getTimeColumnName();
       job.getConfiguration().set(InternalConfigConstants.TIME_COLUMN_CONFIG, timeColumnName);
       if (timeColumnName != null) {
-        FieldSpec fieldSpec = _pinotTableSchema.getFieldSpecFor(timeColumnName);
-        if (fieldSpec != null) {
-          TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-          TimeGranularitySpec outgoingGranularitySpec = timeFieldSpec.getOutgoingGranularitySpec();
+        DateTimeFieldSpec dateTimeFieldSpec = _pinotTableSchema.getSpecForTimeColumn(timeColumnName);
+        if (dateTimeFieldSpec != null) {
+          DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
           job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_TYPE, outgoingGranularitySpec.getTimeType().toString());
+              .set(InternalConfigConstants.SEGMENT_TIME_TYPE, formatSpec.getColumnUnit().toString());
           job.getConfiguration()
-              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, outgoingGranularitySpec.getTimeFormat());
+              .set(InternalConfigConstants.SEGMENT_TIME_FORMAT, formatSpec.getTimeFormat().toString());
+          job.getConfiguration()
+              .set(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN, formatSpec.getSDFPattern());
         }
       }
       job.getConfiguration()
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
index 4282ea0..08d8d53 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/InternalConfigConstants.java
@@ -29,6 +29,7 @@ public class InternalConfigConstants {
   public static final String SEGMENT_PUSH_FREQUENCY = "segment.push.frequency";
   public static final String SEGMENT_TIME_TYPE = "segment.time.type";
   public static final String SEGMENT_TIME_FORMAT = "segment.time.format";
+  public static final String SEGMENT_TIME_SDF_PATTERN = "segment.time.sdf.pattern";
 
   // Partitioning configs
   public static final String PARTITION_COLUMN_CONFIG = "partition.column";
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
index 82dde63..47242e5 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentCreationMapper.java
@@ -47,6 +47,8 @@ import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -136,23 +138,19 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
         Preconditions.checkState(_tableConfig != null,
             "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
         SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
-        String timeFormat = null;
-        TimeUnit timeType = null;
+        DateTimeFormatSpec dateTimeFormatSpec = null;
         String timeColumnName = _tableConfig.getValidationConfig().getTimeColumnName();
 
         if (timeColumnName != null) {
-          FieldSpec fieldSpec = _schema.getFieldSpecFor(timeColumnName);
-          if (fieldSpec != null) {
-            TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-            timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
-            timeType = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
+          DateTimeFieldSpec dateTimeFieldSpec = _schema.getSpecForTimeColumn(timeColumnName);
+          if (dateTimeFieldSpec != null) {
+            dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
           }
         }
         _segmentNameGenerator =
             new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
                 _jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false),
-                validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(),
-                timeType, timeFormat);
+                validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(), dateTimeFormatSpec);
         break;
       default:
         throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
index 65bbab4..76ba959 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mappers/SegmentPreprocessingMapper.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.hadoop.job.InternalConfigConstants;
 import org.apache.pinot.ingestion.common.JobConfigConstants;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -67,9 +69,15 @@ public class SegmentPreprocessingMapper extends Mapper<AvroKey<GenericRecord>, N
 
       String timeType = configuration.get(InternalConfigConstants.SEGMENT_TIME_TYPE);
       String timeFormat = configuration.get(InternalConfigConstants.SEGMENT_TIME_FORMAT);
-      TimeUnit timeUnit = TimeUnit.valueOf(timeType);
+      DateTimeFormatSpec dateTimeFormatSpec;
+      if (timeFormat.equals(DateTimeFieldSpec.TimeFormat.EPOCH.toString())) {
+        dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat);
+      } else {
+        dateTimeFormatSpec = new DateTimeFormatSpec(1, timeType, timeFormat,
+            configuration.get(InternalConfigConstants.SEGMENT_TIME_SDF_PATTERN));
+      }
       _normalizedDateSegmentNameGenerator =
-          new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, timeUnit, timeFormat);
+          new NormalizedDateSegmentNameGenerator(tableName, null, false, "APPEND", pushFrequency, dateTimeFormatSpec);
       _sampleNormalizedTimeColumnValue = _normalizedDateSegmentNameGenerator.getNormalizedDate(timeColumnValue);
     }
 
diff --git a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
index 15e462d..80d7434 100644
--- a/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
+++ b/pinot-plugins/pinot-batch-ingestion/v0_deprecated/pinot-spark/src/main/java/org/apache/pinot/spark/jobs/SparkSegmentCreationFunction.java
@@ -44,6 +44,8 @@ import org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig;
 import org.apache.pinot.plugin.inputformat.thrift.ThriftRecordReaderConfig;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -119,23 +121,18 @@ public class SparkSegmentCreationFunction implements Serializable {
         Preconditions.checkState(_tableConfig != null,
             "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
         SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
-        String timeFormat = null;
-        TimeUnit timeType = null;
+        DateTimeFormatSpec dateTimeFormatSpec = null;
         String timeColumnName = _tableConfig.getValidationConfig().getTimeColumnName();
-
         if (timeColumnName != null) {
-          FieldSpec fieldSpec = _schema.getFieldSpecFor(timeColumnName);
-          if (fieldSpec != null) {
-            TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-            timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
-            timeType = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
+          DateTimeFieldSpec dateTimeFieldSpec = _schema.getSpecForTimeColumn(timeColumnName);
+          if (dateTimeFieldSpec != null) {
+            dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeFieldSpec.getFormat());
           }
         }
         _segmentNameGenerator =
             new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
                 _jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false),
-                validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(),
-                timeType, timeFormat);
+                validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(), dateTimeFormatSpec);
         break;
       default:
         throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
index e2f7136..f4ac41c 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/main/java/org/apache/pinot/plugin/inputformat/avro/AvroUtils.java
@@ -36,6 +36,9 @@ import org.apache.avro.file.DataFileStream;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumReader;
 import org.apache.avro.generic.GenericRecord;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
+import org.apache.pinot.spi.data.DateTimeGranularitySpec;
 import org.apache.pinot.spi.data.DimensionFieldSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.MetricFieldSpec;
@@ -87,6 +90,13 @@ public class AvroUtils {
             Preconditions.checkNotNull(timeUnit, "Time unit cannot be null");
             pinotSchema.addField(new TimeFieldSpec(new TimeGranularitySpec(dataType, timeUnit, field.name())));
             break;
+          case DATE_TIME:
+            Preconditions.checkState(isSingleValueField, "Time field: %s cannot be multi-valued", fieldName);
+            Preconditions.checkNotNull(timeUnit, "Time unit cannot be null");
+            pinotSchema.addField(new DateTimeFieldSpec(field.name(), dataType,
+                new DateTimeFormatSpec(1, timeUnit.toString(), DateTimeFieldSpec.TimeFormat.EPOCH.toString())
+                    .getFormat(), new DateTimeGranularitySpec(1, timeUnit).getGranularity()));
+            break;
           default:
             throw new UnsupportedOperationException(
                 "Unsupported field type: " + fieldType + " for field: " + fieldName);
diff --git a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java
index 7bed45f..7268798 100644
--- a/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java
+++ b/pinot-plugins/pinot-input-format/pinot-avro-base/src/test/java/org/apache/pinot/plugin/inputformat/avro/AvroUtilsTest.java
@@ -63,5 +63,16 @@ public class AvroUtilsTest {
         .addMetric("m1", DataType.INT).addMetric("m2", DataType.INT)
         .addTime(new TimeGranularitySpec(DataType.LONG, TimeUnit.HOURS, "hoursSinceEpoch"), null).build();
     Assert.assertEquals(expectedSchema, inferredPinotSchema);
+
+    fieldSpecMap =
+        new ImmutableMap.Builder<String, FieldSpec.FieldType>().put("d1", FieldType.DIMENSION)
+            .put("d2", FieldType.DIMENSION).put("d3", FieldType.DIMENSION).put("hoursSinceEpoch", FieldType.DATE_TIME)
+            .put("m1", FieldType.METRIC).put("m2", FieldType.METRIC).build();
+    inferredPinotSchema = AvroUtils.getPinotSchemaFromAvroSchema(avroSchema, fieldSpecMap, TimeUnit.HOURS);
+    expectedSchema = new Schema.SchemaBuilder().addSingleValueDimension("d1", DataType.STRING)
+        .addSingleValueDimension("d2", DataType.LONG).addSingleValueDimension("d3", DataType.STRING)
+        .addMetric("m1", DataType.INT).addMetric("m2", DataType.INT)
+        .addDateTime("hoursSinceEpoch", DataType.LONG, "1:HOURS:EPOCH", "1:HOURS").build();
+    Assert.assertEquals(expectedSchema, inferredPinotSchema);
   }
 }
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
index 0659839..b227ad5 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/Schema.java
@@ -506,11 +506,11 @@ public final class Schema {
     }
 
     /**
-     * Add timeFieldSpec with incoming and outgoing granularity spec
-     * TODO: This is going to be deprecated in favor of addDateTime().
-     *  Many tests use this to construct Schema with TimeFieldSpec.
-     *  This will continue to exist for a while, as it helps to test backward compatibility of schemas containing TimeFieldSpec
+     * @deprecated in favor of {@link SchemaBuilder#addDateTime(String, DataType, String, String)}
+     * Adds timeFieldSpec with incoming and outgoing granularity spec
+     * This will continue to exist for a while in several tests, as it helps to test backward compatibility of schemas containing TimeFieldSpec
      */
+    @Deprecated
     public SchemaBuilder addTime(TimeGranularitySpec incomingTimeGranularitySpec,
         @Nullable TimeGranularitySpec outgoingTimeGranularitySpec) {
       if (outgoingTimeGranularitySpec != null) {
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeFieldSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeFieldSpec.java
index 297b864..5fefbf7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeFieldSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeFieldSpec.java
@@ -25,9 +25,13 @@ import com.google.common.base.Preconditions;
 import org.apache.pinot.spi.utils.EqualityUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 
-
-@SuppressWarnings("unused")
+/**
+ * @deprecated Use {@link DateTimeFieldSpec} instead.
+ * This should only be used in 1) tests 2) wherever required for backward compatible handling of schemas with TimeFieldSpec
+ * https://github.com/apache/incubator-pinot/issues/2756
+ */
 @JsonIgnoreProperties(ignoreUnknown = true)
+@SuppressWarnings("unused")
 public final class TimeFieldSpec extends FieldSpec {
   private TimeGranularitySpec _incomingGranularitySpec;
   private TimeGranularitySpec _outgoingGranularitySpec;
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
index 794a95c..e97f537 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/TimeGranularitySpec.java
@@ -30,6 +30,7 @@ import org.joda.time.format.DateTimeFormat;
 
 
 /**
+ * @deprecated Use DateTimeFieldSpec instead.
  * The <code>TimeGranularitySpec</code> class contains all specs related to time field.
  * <p>- <code>DataType</code>: data type of the time column (e.g. INT, LONG).
  * <p>- <code>TimeType</code>: time unit of the time column (e.g. MINUTES, HOURS).
@@ -58,8 +59,10 @@ public class TimeGranularitySpec {
   private String _timeFormat = DEFAULT_TIME_FORMAT;
 
   /*
+  Deprecated. Use {@link DateTimeFieldSpec.TimeFormat} instead
    * Can be either EPOCH (default) or SIMPLE_DATE_FORMAT:pattern e.g SIMPLE_DATE_FORMAT:yyyyMMdd
    */
+  @Deprecated
   public enum TimeFormat {
     EPOCH, //default
     SIMPLE_DATE_FORMAT
@@ -170,68 +173,6 @@ public class TimeGranularitySpec {
   }
 
   /**
-   * Convert the units of time since epoch to {@link DateTime} format using current <code>TimeGranularitySpec</code>.
-   */
-  public DateTime toDateTime(long timeSinceEpoch) {
-    return new DateTime(_timeType.toMillis(timeSinceEpoch * _timeUnitSize));
-  }
-
-  /**
-   * Convert the timeColumnValue to millis
-   *
-   * eg:
-   * 1) given timeColumnValue = 416359 and timeGranularitySpec:{timeUnitSize=1,timetype=HOURS,timeFormat=EPOCH},
-   * timeGranularitySpec.toMillis(416359) = 1498892400000 (i.e. timeColumnValue*60*60*1000)
-   *
-   * 2) given timeColumnValue = 4996308 and timeGranularitySpec:{timeUnitSize=5,timetype=MINUTES,timeFormat=EPOCH},
-   * timeGranularitySpec.toMillis(4996308) = 1498892400000 (i.e. timeColumnValue*5*60*1000)
-   *
-   * 3) given timeColumnValue = 20170701 and timeGranularitySpec:{timeUnitSize=1,timetype=DAYS,timeFormat=SIMPLE_DATE_FORMAT:yyyyMMdd},
-   * timeGranularitySpec.toMillis(20170701) = 1498892400000
-   *
-   * @param timeColumnValue - time column value to convert
-   * @return time column value in millis
-   */
-  public Long toMillis(Object timeColumnValue) {
-    Preconditions.checkNotNull(timeColumnValue);
-    Long timeColumnValueMs;
-    if (_timeFormat.equals(TimeFormat.EPOCH.toString())) {
-      timeColumnValueMs = TimeUnit.MILLISECONDS.convert((Long) timeColumnValue * _timeUnitSize, _timeType);
-    } else {
-      String pattern = _timeFormat.split(COLON_SEPARATOR)[1];
-      timeColumnValueMs = DateTimeFormat.forPattern(pattern).parseMillis(String.valueOf(timeColumnValue));
-    }
-    return timeColumnValueMs;
-  }
-
-  /**
-   * Convert the time value in millis to the format from timeGranularitySpec
-   * eg:
-   * 1) given timeColumnValueMS = 1498892400000 and timeGranularitySpec:{timeUnitSize=1,timetype=HOURS,timeFormat=EPOCH},
-   * timeGranularitySpec.fromMillis(1498892400000) = 416359 (i.e. timeColumnValueMS/(1000*60*60))
-   *
-   * 2) given timeColumnValueMS = 1498892400000 and timeGranularitySpec:{timeUnitSize=5,timetype=MINUTES,timeFormat=EPOCH},
-   * timeGranularitySpec.fromMillis(1498892400000) = 4996308 (i.e. timeColumnValueMS/(1000*60*5))
-   *
-   * 3) given timeColumnValueMS = 1498892400000 and timeGranularitySpec:{timeUnitSize=1,timetype=DAYS,timeFormat=SIMPLE_DATE_FORMAT:yyyyMMdd},
-   * timeGranularitySpec.fromMillis(1498892400000) = 20170701
-   *
-   * @param timeColumnValueMS - millis value to convert
-   * @return time value in timeGranularitySpec format
-   */
-  public Object fromMillis(Long timeColumnValueMS) {
-    Preconditions.checkNotNull(timeColumnValueMS);
-    Object timeColumnValue;
-    if (_timeFormat.equals(TimeFormat.EPOCH.toString())) {
-      timeColumnValue = _timeType.convert(timeColumnValueMS, TimeUnit.MILLISECONDS) / _timeUnitSize;
-    } else {
-      String pattern = _timeFormat.split(COLON_SEPARATOR)[1];
-      timeColumnValue = DateTimeFormat.forPattern(pattern).print(timeColumnValueMS);
-    }
-    return timeColumnValue;
-  }
-
-  /**
    * Returns the {@link ObjectNode} representing the time granularity spec.
    * <p>Only contains fields with non-default value.
    * <p>NOTE: here we use {@link ObjectNode} to preserve the insertion order.
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeConverter.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeConverter.java
index 05338e2..420c3de 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeConverter.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/TimeConverter.java
@@ -24,6 +24,7 @@ import org.apache.pinot.spi.data.TimeGranularitySpec;
 
 
 /**
+ * @deprecated This conversion should be done via transform functions set on the schema field spec
  * TimeConverter to convert value to/from milliseconds since epoch based on the given {@link TimeGranularitySpec}.
  */
 public class TimeConverter {
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java b/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
index fd8e502..1e2c1cb 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/SegmentDumpTool.java
@@ -91,9 +91,8 @@ public class SegmentDumpTool {
       System.out.print(i);
       System.out.print("\t");
       for (String columnName : columnNames) {
-        FieldSpec.DataType columnType = metadata.getSchema().getFieldSpecFor(columnName).getDataType();
         BlockSingleValIterator itr = iterators.get(columnName);
-        Integer encodedValue = itr.nextIntVal();
+        int encodedValue = itr.nextIntVal();
         Object value = dictionaries.get(columnName).get(encodedValue);
         System.out.print(value);
         System.out.print("\t");
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
index a1a11a3..2cdc1cc 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/data/generator/DataGenerator.java
@@ -40,7 +40,7 @@ import java.util.concurrent.TimeUnit;
 /**
  * Sep 12, 2014
  */
-
+// TODO: add DATE_TIME to the data generator
 public class DataGenerator {
   private static final Logger LOGGER = LoggerFactory.getLogger(DataGenerator.class);
   private File outDir;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java
index f6f77da..c387230 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/query/comparison/SegmentInfoProvider.java
@@ -120,6 +120,7 @@ public class SegmentInfoProvider {
           // Treat TIME column as single-value dimension column
           case DIMENSION:
           case TIME:
+          case DATE_TIME:
             uniqueSingleValueDimensions.add(columnName);
             loadValuesForSingleValueDimension(indexSegment, singleValueDimensionValuesMap, columnName);
             break;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
index 1f773ae..76c691d 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -37,6 +37,8 @@ import org.apache.pinot.core.segment.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
 import org.apache.pinot.spi.config.table.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.DateTimeFieldSpec;
+import org.apache.pinot.spi.data.DateTimeFormatSpec;
 import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.TimeFieldSpec;
@@ -248,20 +250,17 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
     String pushFrequency = validationConfig.getSegmentPushFrequency();
     String pushType = validationConfig.getSegmentPushType();
     String timeColumnName = validationConfig.getTimeColumnName();
-    TimeUnit timeType = null;
-    String timeFormat = null;
+    DateTimeFormatSpec dateTimeFormatSpec = null;
     if (timeColumnName != null) {
-      FieldSpec fieldSpec = schema.getFieldSpecFor(timeColumnName);
-      if (fieldSpec != null) {
-        TimeFieldSpec timeFieldSpec = (TimeFieldSpec) fieldSpec;
-        timeType = timeFieldSpec.getOutgoingGranularitySpec().getTimeType();
-        timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+      DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(timeColumnName);
+      if (dateTimeSpec != null) {
+        dateTimeFormatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat());
       }
     }
 
     // Generate the final segment name using segment name generator
     NormalizedDateSegmentNameGenerator segmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(tableName, null, false, pushType, pushFrequency, timeType, timeFormat);
+        new NormalizedDateSegmentNameGenerator(tableName, null, false, pushType, pushFrequency, dateTimeFormatSpec);
 
     return segmentNameGenerator.generateSegmentName(DEFAULT_SEQUENCE_ID, minStartTime, maxEndTime);
   }
diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
index 02143ca..97757af 100644
--- a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
+++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_schema.json
@@ -324,12 +324,13 @@
       "name": "TotalAddGTime"
     }
   ],
-  "timeFieldSpec": {
-    "incomingGranularitySpec": {
+  "dateTimeFieldSpecs": [
+    {
+      "name": "DaysSinceEpoch",
       "dataType": "INT",
-      "timeType": "DAYS",
-      "name": "DaysSinceEpoch"
+      "format": "1:DAYS:EPOCH",
+      "granularity": "1:DAYS"
     }
-  },
+  ],
   "schemaName": "airlineStats"
 }
diff --git a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
index 02143ca..97757af 100644
--- a/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
+++ b/pinot-tools/src/main/resources/examples/stream/airlineStats/airlineStats_schema.json
@@ -324,12 +324,13 @@
       "name": "TotalAddGTime"
     }
   ],
-  "timeFieldSpec": {
-    "incomingGranularitySpec": {
+  "dateTimeFieldSpecs": [
+    {
+      "name": "DaysSinceEpoch",
       "dataType": "INT",
-      "timeType": "DAYS",
-      "name": "DaysSinceEpoch"
+      "format": "1:DAYS:EPOCH",
+      "granularity": "1:DAYS"
     }
-  },
+  ],
   "schemaName": "airlineStats"
 }
diff --git a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
index 085d728..94f81f3 100644
--- a/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
+++ b/pinot-tools/src/main/resources/examples/stream/meetupRsvp/meetupRsvp_schema.json
@@ -39,12 +39,13 @@
       "name": "group_name"
     }
   ],
-  "timeFieldSpec": {
-    "incomingGranularitySpec": {
-      "timeType": "MILLISECONDS",
+  "dateTimeFieldSpecs": [
+    {
+      "name": "mtime",
       "dataType": "LONG",
-      "name": "mtime"
+      "format": "1:MILLISECONDS:EPOCH",
+      "granularity": "1:MILLISECONDS"
     }
-  },
+  ],
   "schemaName": "meetupRsvp"
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org