You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/10 22:06:42 UTC

[incubator-pinot] branch master updated: Always read start/end time in millis from the segment ZK metadata (#6239)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 750af31  Always read start/end time in millis from the segment ZK metadata (#6239)
750af31 is described below

commit 750af31133f758f3c13cda7d9b22126eb7d52512
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Nov 10 14:05:12 2020 -0800

    Always read start/end time in millis from the segment ZK metadata (#6239)
    
    - Adds the APIs to read the start/end time in millis from the SegmentZKMetadata, deprecates the old getters.
    - Modifies the logic in TimeBoundaryManager to support SDF for the hybrid table.
---
 .../routing/timeboundary/TimeBoundaryManager.java  |  96 +++----
 .../common/metadata/segment/SegmentZKMetadata.java | 291 ++++++++++-----------
 .../common/tier/TimeBasedTierSegmentSelector.java  |   7 +-
 .../apache/pinot/common/utils/CommonConstants.java |  12 +-
 .../pinot/common/data/DateTimeFormatSpecTest.java  |  70 +++--
 .../RealtimeToOfflineSegmentsTaskGenerator.java    |  21 +-
 .../retention/strategy/TimeRetentionStrategy.java  |  13 +-
 .../controller/util/TableRetentionValidator.java   |  17 +-
 .../validation/OfflineSegmentIntervalChecker.java  |  14 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        |   1 -
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  16 +-
 .../apache/pinot/spi/data/DateTimeFormatSpec.java  |  58 ++--
 .../OfflineSegmentIntervalCheckerCommand.java      |   8 +-
 13 files changed, 268 insertions(+), 356 deletions(-)

diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index c7a64bb..4a8c357 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -48,15 +48,15 @@ import org.slf4j.LoggerFactory;
  */
 public class TimeBoundaryManager {
   private static final Logger LOGGER = LoggerFactory.getLogger(TimeBoundaryManager.class);
-  private static final long INVALID_END_TIME = -1;
+  private static final long INVALID_END_TIME_MS = -1;
 
   private final String _offlineTableName;
   private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private final String _segmentZKMetadataPathPrefix;
   private final String _timeColumn;
-  private final TimeUnit _timeUnit;
-  private final boolean _isHourlyTable;
-  private final Map<String, Long> _endTimeMap = new HashMap<>();
+  private final DateTimeFormatSpec _timeFormatSpec;
+  private final long _timeOffsetMs;
+  private final Map<String, Long> _endTimeMsMap = new HashMap<>();
 
   private volatile TimeBoundaryInfo _timeBoundaryInfo;
 
@@ -75,19 +75,20 @@ public class TimeBoundaryManager {
     DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn);
     Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s",
         _timeColumn, _offlineTableName);
-    DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat());
-    _timeUnit = formatSpec.getColumnUnit();
-    Preconditions
-        .checkNotNull(_timeUnit, "Time unit must be configured in the field spec for time column: %s of table: %s",
-            _timeColumn, _offlineTableName);
+    _timeFormatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat());
+    Preconditions.checkNotNull(_timeFormatSpec.getColumnUnit(),
+        "Time unit must be configured in the field spec for time column: %s of table: %s", _timeColumn,
+        _offlineTableName);
 
     // For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use
     // (maxEndTime - 1 DAY)
-    _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
-        .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) && _timeUnit != TimeUnit.DAYS;
+    boolean isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
+        .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency())
+        && _timeFormatSpec.getColumnUnit() != TimeUnit.DAYS;
+    _timeOffsetMs = isHourlyTable ? TimeUnit.HOURS.toMillis(1) : TimeUnit.DAYS.toMillis(1);
 
-    LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeUnit: {}, isHourlyTable: {} for table: {}",
-        _timeColumn, _timeUnit, _isHourlyTable, _offlineTableName);
+    LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeFormat: {}, isHourlyTable: {} for table: {}",
+        _timeColumn, _timeFormatSpec.getFormat(), isHourlyTable, _offlineTableName);
   }
 
   /**
@@ -108,38 +109,38 @@ public class TimeBoundaryManager {
       segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
     }
     List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT);
-    long maxEndTime = INVALID_END_TIME;
+    long maxEndTimeMs = INVALID_END_TIME_MS;
     for (int i = 0; i < numSegments; i++) {
       String segment = segments.get(i);
-      long endTime = extractEndTimeFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
-      _endTimeMap.put(segment, endTime);
-      maxEndTime = Math.max(maxEndTime, endTime);
+      long endTimeMs = extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
+      _endTimeMsMap.put(segment, endTimeMs);
+      maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs);
     }
-    updateTimeBoundaryInfo(maxEndTime);
+    updateTimeBoundaryInfo(maxEndTimeMs);
   }
 
-  private long extractEndTimeFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) {
+  private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) {
     if (znRecord == null) {
       LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _offlineTableName);
-      return INVALID_END_TIME;
+      return INVALID_END_TIME_MS;
     }
 
-    long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, INVALID_END_TIME);
-    if (endTime <= 0) {
+    long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+    if (endTime > 0) {
+      TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
+      return timeUnit.toMillis(endTime);
+    } else {
       LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
-      return INVALID_END_TIME;
+      return INVALID_END_TIME_MS;
     }
-
-    TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
-    return _timeUnit.convert(endTime, timeUnit);
   }
 
-  private void updateTimeBoundaryInfo(long maxEndTime) {
-    if (maxEndTime > 0) {
-      long timeBoundary = getTimeBoundary(maxEndTime);
+  private void updateTimeBoundaryInfo(long maxEndTimeMs) {
+    if (maxEndTimeMs > 0) {
+      String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
       TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
-      if (currentTimeBoundaryInfo == null || Long.parseLong(currentTimeBoundaryInfo.getTimeValue()) != timeBoundary) {
-        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, Long.toString(timeBoundary));
+      if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
+        _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
         LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
       }
     } else {
@@ -150,19 +151,6 @@ public class TimeBoundaryManager {
   }
 
   /**
-   * Returns the time boundary based on the given maximum end time.
-   * <p>NOTE: For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary;
-   * otherwise, use (maxEndTime - 1 DAY).
-   */
-  private long getTimeBoundary(long maxEndTime) {
-    if (_isHourlyTable) {
-      return maxEndTime - _timeUnit.convert(1L, TimeUnit.HOURS);
-    } else {
-      return maxEndTime - _timeUnit.convert(1L, TimeUnit.DAYS);
-    }
-  }
-
-  /**
    * Processes the external view change based on the given ideal state and online segments (segments with
    * ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
    * <p>NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
@@ -174,28 +162,28 @@ public class TimeBoundaryManager {
   public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
       Set<String> onlineSegments) {
     for (String segment : onlineSegments) {
-      _endTimeMap.computeIfAbsent(segment, k -> extractEndTimeFromSegmentZKMetadataZNRecord(segment,
+      _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
           _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
     }
-    _endTimeMap.keySet().retainAll(onlineSegments);
-    updateTimeBoundaryInfo(getMaxEndTime());
+    _endTimeMsMap.keySet().retainAll(onlineSegments);
+    updateTimeBoundaryInfo(getMaxEndTimeMs());
   }
 
-  private long getMaxEndTime() {
-    long maxEndTime = INVALID_END_TIME;
-    for (long endTime : _endTimeMap.values()) {
-      maxEndTime = Math.max(maxEndTime, endTime);
+  private long getMaxEndTimeMs() {
+    long maxEndTimeMs = INVALID_END_TIME_MS;
+    for (long endTimeMs : _endTimeMsMap.values()) {
+      maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs);
     }
-    return maxEndTime;
+    return maxEndTimeMs;
   }
 
   /**
    * Refreshes the metadata for the given segment (called when segment is getting refreshed).
    */
   public synchronized void refreshSegment(String segment) {
-    _endTimeMap.put(segment, extractEndTimeFromSegmentZKMetadataZNRecord(segment,
+    _endTimeMsMap.put(segment, extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
         _propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
-    updateTimeBoundaryInfo(getMaxEndTime());
+    updateTimeBoundaryInfo(getMaxEndTimeMs());
   }
 
   @Nullable
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index f0f00ad..5ee5288 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -22,11 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
 import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.metadata.ZKMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
 import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.joda.time.Duration;
@@ -34,11 +34,6 @@ import org.joda.time.Interval;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.apache.pinot.spi.utils.EqualityUtils.hashCodeOf;
-import static org.apache.pinot.spi.utils.EqualityUtils.isEqual;
-import static org.apache.pinot.spi.utils.EqualityUtils.isNullOrNotSameClass;
-import static org.apache.pinot.spi.utils.EqualityUtils.isSameReference;
-
 
 public abstract class SegmentZKMetadata implements ZKMetadata {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadata.class);
@@ -50,16 +45,14 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
   private long _startTime = -1;
   private long _endTime = -1;
   private TimeUnit _timeUnit;
-  private Duration _timeGranularity;
-  private Interval _timeInterval;
   private String _indexVersion;
   private long _totalDocs = -1;
   private long _crc = -1;
   private long _creationTime = -1;
   private SegmentPartitionMetadata _partitionMetadata;
   private long _segmentUploadStartTime = -1;
-  private Map<String, String> _customMap;
   private String _crypterName;
+  private Map<String, String> _customMap;
 
   @Deprecated
   private String _tableName;
@@ -68,23 +61,20 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
   }
 
   public SegmentZKMetadata(ZNRecord znRecord) {
-    _segmentName = znRecord.getSimpleField(CommonConstants.Segment.SEGMENT_NAME);
-    _tableName = znRecord.getSimpleField(CommonConstants.Segment.TABLE_NAME);
-    _crypterName = znRecord.getSimpleField(CommonConstants.Segment.CRYPTER_NAME);
-    _segmentType = znRecord.getEnumField(CommonConstants.Segment.SEGMENT_TYPE, SegmentType.class, SegmentType.OFFLINE);
-    _startTime = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
-    _endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
-    if (znRecord.getSimpleFields().containsKey(CommonConstants.Segment.TIME_UNIT) && !znRecord
-        .getSimpleField(CommonConstants.Segment.TIME_UNIT).equals(NULL)) {
-      setTimeUnit(znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS));
+    _segmentName = znRecord.getSimpleField(Segment.SEGMENT_NAME);
+    _segmentType = znRecord.getEnumField(Segment.SEGMENT_TYPE, SegmentType.class, SegmentType.OFFLINE);
+    _startTime = znRecord.getLongField(Segment.START_TIME, -1);
+    _endTime = znRecord.getLongField(Segment.END_TIME, -1);
+    String timeUnitString = znRecord.getSimpleField(Segment.TIME_UNIT);
+    if (timeUnitString != null && !timeUnitString.equals(NULL)) {
+      _timeUnit = znRecord.getEnumField(Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
     }
-    _indexVersion = znRecord.getSimpleField(CommonConstants.Segment.INDEX_VERSION);
-    _totalDocs = znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
-    _crc = znRecord.getLongField(CommonConstants.Segment.CRC, -1);
-    _creationTime = znRecord.getLongField(CommonConstants.Segment.CREATION_TIME, -1);
-
+    _indexVersion = znRecord.getSimpleField(Segment.INDEX_VERSION);
+    _totalDocs = znRecord.getLongField(Segment.TOTAL_DOCS, -1);
+    _crc = znRecord.getLongField(Segment.CRC, -1);
+    _creationTime = znRecord.getLongField(Segment.CREATION_TIME, -1);
     try {
-      String partitionMetadataJson = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_METADATA);
+      String partitionMetadataJson = znRecord.getSimpleField(Segment.PARTITION_METADATA);
       if (partitionMetadataJson != null) {
         _partitionMetadata = SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
       }
@@ -93,8 +83,12 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
           "Exception caught while reading partition info from zk metadata for segment '{}', partition info dropped.",
           _segmentName, e);
     }
-    _segmentUploadStartTime = znRecord.getLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, -1);
-    _customMap = znRecord.getMapField(CommonConstants.Segment.CUSTOM_MAP);
+    _segmentUploadStartTime = znRecord.getLongField(Segment.SEGMENT_UPLOAD_START_TIME, -1);
+    _crypterName = znRecord.getSimpleField(Segment.CRYPTER_NAME);
+    _customMap = znRecord.getMapField(Segment.CUSTOM_MAP);
+
+    // For backward-compatibility
+    _tableName = znRecord.getSimpleField(Segment.TABLE_NAME);
   }
 
   public String getSegmentName() {
@@ -105,54 +99,40 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
     _segmentName = segmentName;
   }
 
-  @Deprecated
-  public String getTableName() {
-    return _tableName;
+  public SegmentType getSegmentType() {
+    return _segmentType;
   }
 
-  @Deprecated
-  public void setTableName(String tableName) {
-    _tableName = tableName;
+  public void setSegmentType(SegmentType segmentType) {
+    _segmentType = segmentType;
   }
 
-  public long getStartTime() {
-    return _startTime;
+  public long getStartTimeMs() {
+    if (_startTime > 0 && _timeUnit != null) {
+      return _timeUnit.toMillis(_startTime);
+    } else {
+      return -1;
+    }
   }
 
   public void setStartTime(long startTime) {
     _startTime = startTime;
   }
 
-  public long getEndTime() {
-    return _endTime;
+  public long getEndTimeMs() {
+    if (_endTime > 0 && _timeUnit != null) {
+      return _timeUnit.toMillis(_endTime);
+    } else {
+      return -1;
+    }
   }
 
   public void setEndTime(long endTime) {
     _endTime = endTime;
   }
 
-  public TimeUnit getTimeUnit() {
-    return _timeUnit;
-  }
-
-  /**
-   * NOTE: should be called after setting start and end time.
-   */
-  public void setTimeUnit(@Nonnull TimeUnit timeUnit) {
+  public void setTimeUnit(TimeUnit timeUnit) {
     _timeUnit = timeUnit;
-    _timeGranularity = new Duration(_timeUnit.toMillis(1));
-    // For consuming segment, end time might not be set
-    if (_startTime >= 0 && _startTime <= _endTime) {
-      _timeInterval = new Interval(_timeUnit.toMillis(_startTime), _timeUnit.toMillis(_endTime));
-    }
-  }
-
-  public Duration getTimeGranularity() {
-    return _timeGranularity;
-  }
-
-  public Interval getTimeInterval() {
-    return _timeInterval;
   }
 
   public String getIndexVersion() {
@@ -163,22 +143,6 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
     _indexVersion = indexVersion;
   }
 
-  public SegmentType getSegmentType() {
-    return _segmentType;
-  }
-
-  public void setSegmentType(SegmentType segmentType) {
-    _segmentType = segmentType;
-  }
-
-  public String getCrypterName() {
-    return _crypterName;
-  }
-
-  public void setCrypterName(String crypterName) {
-    _crypterName = crypterName;
-  }
-
   public long getTotalDocs() {
     return _totalDocs;
   }
@@ -219,6 +183,14 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
     _segmentUploadStartTime = segmentUploadStartTime;
   }
 
+  public String getCrypterName() {
+    return _crypterName;
+  }
+
+  public void setCrypterName(String crypterName) {
+    _crypterName = crypterName;
+  }
+
   public Map<String, String> getCustomMap() {
     return _customMap;
   }
@@ -227,76 +199,87 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
     _customMap = customMap;
   }
 
+  @Deprecated
+  public String getTableName() {
+    return _tableName;
+  }
+
+  @Deprecated
+  public void setTableName(String tableName) {
+    _tableName = tableName;
+  }
+
+  @Deprecated
+  public long getStartTime() {
+    return _startTime;
+  }
+
+  @Deprecated
+  public long getEndTime() {
+    return _endTime;
+  }
+
+  @Deprecated
+  public TimeUnit getTimeUnit() {
+    return _timeUnit;
+  }
+
+  @Deprecated
+  public Duration getTimeGranularity() {
+    return _timeUnit != null ? new Duration(_timeUnit.toMillis(1)) : null;
+  }
+
+  @Deprecated
+  public Interval getTimeInterval() {
+    if (_startTime > 0 && _startTime <= _endTime && _timeUnit != null) {
+      return new Interval(_timeUnit.toMillis(_startTime), _timeUnit.toMillis(_endTime));
+    } else {
+      return null;
+    }
+  }
+
   @Override
-  public boolean equals(Object segmentMetadata) {
-    if (isSameReference(this, segmentMetadata)) {
+  public boolean equals(Object o) {
+    if (this == o) {
       return true;
     }
-
-    if (isNullOrNotSameClass(this, segmentMetadata)) {
+    if (o == null || getClass() != o.getClass()) {
       return false;
     }
-
-    SegmentZKMetadata metadata = (SegmentZKMetadata) segmentMetadata;
-    return isEqual(_segmentName, metadata._segmentName) && isEqual(_crypterName, metadata._crypterName) && isEqual(
-        _tableName, metadata._tableName) && isEqual(_indexVersion, metadata._indexVersion) && isEqual(_timeUnit,
-        metadata._timeUnit) && isEqual(_startTime, metadata._startTime) && isEqual(_endTime, metadata._endTime)
-        && isEqual(_segmentType, metadata._segmentType) && isEqual(_totalDocs, metadata._totalDocs) && isEqual(_crc,
-        metadata._crc) && isEqual(_creationTime, metadata._creationTime) && isEqual(_partitionMetadata,
-        metadata._partitionMetadata) && isEqual(_segmentUploadStartTime, metadata._segmentUploadStartTime) && isEqual(
-        _customMap, metadata._customMap);
+    SegmentZKMetadata that = (SegmentZKMetadata) o;
+    return _startTime == that._startTime && _endTime == that._endTime && _totalDocs == that._totalDocs
+        && _crc == that._crc && _creationTime == that._creationTime
+        && _segmentUploadStartTime == that._segmentUploadStartTime && Objects.equals(_segmentName, that._segmentName)
+        && _segmentType == that._segmentType && _timeUnit == that._timeUnit && Objects
+        .equals(_indexVersion, that._indexVersion) && Objects.equals(_partitionMetadata, that._partitionMetadata)
+        && Objects.equals(_crypterName, that._crypterName) && Objects.equals(_customMap, that._customMap) && Objects
+        .equals(_tableName, that._tableName);
   }
 
   @Override
   public int hashCode() {
-    int result = hashCodeOf(_segmentName);
-    result = hashCodeOf(result, _tableName);
-    result = hashCodeOf(result, _crypterName);
-    result = hashCodeOf(result, _segmentType);
-    result = hashCodeOf(result, _startTime);
-    result = hashCodeOf(result, _endTime);
-    result = hashCodeOf(result, _timeUnit);
-    result = hashCodeOf(result, _indexVersion);
-    result = hashCodeOf(result, _totalDocs);
-    result = hashCodeOf(result, _crc);
-    result = hashCodeOf(result, _creationTime);
-    result = hashCodeOf(result, _partitionMetadata);
-    result = hashCodeOf(result, _segmentUploadStartTime);
-    result = hashCodeOf(result, _customMap);
-    return result;
+    return Objects.hash(_segmentName, _segmentType, _startTime, _endTime, _timeUnit, _indexVersion, _totalDocs, _crc,
+        _creationTime, _partitionMetadata, _segmentUploadStartTime, _crypterName, _customMap, _tableName);
   }
 
   @Override
   public ZNRecord toZNRecord() {
     ZNRecord znRecord = new ZNRecord(_segmentName);
-    znRecord.setSimpleField(CommonConstants.Segment.SEGMENT_NAME, _segmentName);
-
-    if (_tableName != null) {
-      znRecord.setSimpleField(CommonConstants.Segment.TABLE_NAME, _tableName);
-    }
 
-    if (_crypterName != null) {
-      znRecord.setSimpleField(CommonConstants.Segment.CRYPTER_NAME, _crypterName);
-    }
-
-    znRecord.setEnumField(CommonConstants.Segment.SEGMENT_TYPE, _segmentType);
-    if (_timeUnit == null) {
-      znRecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, NULL);
-    } else {
-      znRecord.setEnumField(CommonConstants.Segment.TIME_UNIT, _timeUnit);
-    }
-    znRecord.setLongField(CommonConstants.Segment.START_TIME, _startTime);
-    znRecord.setLongField(CommonConstants.Segment.END_TIME, _endTime);
-
-    znRecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, _indexVersion);
-    znRecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, _totalDocs);
-    znRecord.setLongField(CommonConstants.Segment.CRC, _crc);
-    znRecord.setLongField(CommonConstants.Segment.CREATION_TIME, _creationTime);
+    znRecord.setSimpleField(Segment.SEGMENT_NAME, _segmentName);
+    znRecord.setEnumField(Segment.SEGMENT_TYPE, _segmentType);
+    znRecord.setLongField(Segment.START_TIME, _startTime);
+    znRecord.setLongField(Segment.END_TIME, _endTime);
+    znRecord.setSimpleField(Segment.TIME_UNIT, _timeUnit != null ? _timeUnit.name() : NULL);
+    znRecord.setSimpleField(Segment.INDEX_VERSION, _indexVersion);
+    znRecord.setLongField(Segment.TOTAL_DOCS, _totalDocs);
+    znRecord.setLongField(Segment.CRC, _crc);
+    znRecord.setLongField(Segment.CREATION_TIME, _creationTime);
 
     if (_partitionMetadata != null) {
       try {
         String partitionMetadataJson = _partitionMetadata.toJsonString();
-        znRecord.setSimpleField(CommonConstants.Segment.PARTITION_METADATA, partitionMetadataJson);
+        znRecord.setSimpleField(Segment.PARTITION_METADATA, partitionMetadataJson);
       } catch (IOException e) {
         LOGGER
             .error("Exception caught while writing partition metadata into ZNRecord for segment '{}', will be dropped",
@@ -304,59 +287,65 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
       }
     }
     if (_segmentUploadStartTime > 0) {
-      znRecord.setLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, _segmentUploadStartTime);
+      znRecord.setLongField(Segment.SEGMENT_UPLOAD_START_TIME, _segmentUploadStartTime);
+    }
+    if (_crypterName != null) {
+      znRecord.setSimpleField(Segment.CRYPTER_NAME, _crypterName);
     }
     if (_customMap != null) {
-      znRecord.setMapField(CommonConstants.Segment.CUSTOM_MAP, _customMap);
+      znRecord.setMapField(Segment.CUSTOM_MAP, _customMap);
+    }
+
+    // For backward-compatibility
+    if (_tableName != null) {
+      znRecord.setSimpleField(Segment.TABLE_NAME, _tableName);
     }
+
     return znRecord;
   }
 
   public Map<String, String> toMap() {
     Map<String, String> configMap = new HashMap<>();
-    configMap.put(CommonConstants.Segment.SEGMENT_NAME, _segmentName);
-    if (_tableName != null) {
-      configMap.put(CommonConstants.Segment.TABLE_NAME, _tableName);
-    }
-    configMap.put(CommonConstants.Segment.SEGMENT_TYPE, _segmentType.toString());
-    if (_timeUnit == null) {
-      configMap.put(CommonConstants.Segment.TIME_UNIT, null);
-    } else {
-      configMap.put(CommonConstants.Segment.TIME_UNIT, _timeUnit.toString());
-    }
-    configMap.put(CommonConstants.Segment.START_TIME, Long.toString(_startTime));
-    configMap.put(CommonConstants.Segment.END_TIME, Long.toString(_endTime));
 
-    configMap.put(CommonConstants.Segment.INDEX_VERSION, _indexVersion);
-    configMap.put(CommonConstants.Segment.TOTAL_DOCS, Long.toString(_totalDocs));
-    configMap.put(CommonConstants.Segment.CRC, Long.toString(_crc));
-    configMap.put(CommonConstants.Segment.CREATION_TIME, Long.toString(_creationTime));
+    configMap.put(Segment.SEGMENT_NAME, _segmentName);
+    configMap.put(Segment.SEGMENT_TYPE, _segmentType.toString());
+    configMap.put(Segment.START_TIME, Long.toString(_startTime));
+    configMap.put(Segment.END_TIME, Long.toString(_endTime));
+    configMap.put(Segment.TIME_UNIT, _timeUnit != null ? _timeUnit.name() : null);
+    configMap.put(Segment.INDEX_VERSION, _indexVersion);
+    configMap.put(Segment.TOTAL_DOCS, Long.toString(_totalDocs));
+    configMap.put(Segment.CRC, Long.toString(_crc));
+    configMap.put(Segment.CREATION_TIME, Long.toString(_creationTime));
 
     if (_partitionMetadata != null) {
       try {
         String partitionMetadataJson = _partitionMetadata.toJsonString();
-        configMap.put(CommonConstants.Segment.PARTITION_METADATA, partitionMetadataJson);
+        configMap.put(Segment.PARTITION_METADATA, partitionMetadataJson);
       } catch (IOException e) {
         LOGGER.error(
             "Exception caught while converting partition metadata into JSON string for segment '{}', will be dropped",
             _segmentName, e);
       }
     }
-
     if (_segmentUploadStartTime > 0) {
-      configMap.put(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, Long.toString(_segmentUploadStartTime));
+      configMap.put(Segment.SEGMENT_UPLOAD_START_TIME, Long.toString(_segmentUploadStartTime));
     }
-
-    if (_customMap == null) {
-      configMap.put(CommonConstants.Segment.CUSTOM_MAP, null);
-    } else {
+    if (_crypterName != null) {
+      configMap.put(Segment.CRYPTER_NAME, _crypterName);
+    }
+    if (_customMap != null) {
       try {
-        configMap.put(CommonConstants.Segment.CUSTOM_MAP, JsonUtils.objectToString(_customMap));
+        configMap.put(Segment.CUSTOM_MAP, JsonUtils.objectToString(_customMap));
       } catch (JsonProcessingException e) {
         throw new RuntimeException(e);
       }
     }
 
+    // For backward-compatibility
+    if (_tableName != null) {
+      configMap.put(Segment.TABLE_NAME, _tableName);
+    }
+
     return configMap;
   }
 }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
index f4b201c..3023bbf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
@@ -19,7 +19,6 @@
 package org.apache.pinot.common.tier;
 
 import com.google.common.base.Preconditions;
-import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixManager;
 import org.apache.pinot.common.metadata.ZKMetadataProvider;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -66,10 +65,10 @@ public class TimeBasedTierSegmentSelector implements TierSegmentSelector {
             tableNameWithType);
 
     // get segment end time to decide if segment gets selected
-    TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
+    long endTimeMs = segmentZKMetadata.getEndTimeMs();
     Preconditions
-        .checkNotNull(timeUnit, "Time unit is not set for segment: %s of table: %s", segmentName, tableNameWithType);
-    long endTimeMs = timeUnit.toMillis(segmentZKMetadata.getEndTime());
+        .checkState(endTimeMs > 0, "Invalid endTimeMs: %s for segment: %s of table: %s", endTimeMs, segmentName,
+            tableNameWithType);
     long now = System.currentTimeMillis();
     return (now - endTimeMs) > _segmentAgeMillis;
   }
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index e709b15..afb1852 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -364,18 +364,16 @@ public class CommonConstants {
 
     public static final String SEGMENT_NAME = "segment.name";
     public static final String SEGMENT_TYPE = "segment.type";
-    public static final String CRYPTER_NAME = "segment.crypter";
-    public static final String INDEX_VERSION = "segment.index.version";
     public static final String START_TIME = "segment.start.time";
     public static final String END_TIME = "segment.end.time";
     public static final String TIME_UNIT = "segment.time.unit";
+    public static final String INDEX_VERSION = "segment.index.version";
     public static final String TOTAL_DOCS = "segment.total.docs";
     public static final String CRC = "segment.crc";
     public static final String CREATION_TIME = "segment.creation.time";
     public static final String FLUSH_THRESHOLD_SIZE = "segment.flush.threshold.size";
     public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
     public static final String PARTITION_METADATA = "segment.partition.metadata";
-    public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
     /**
      * This field is used for parallel push protection to lock the segment globally.
      * We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
@@ -383,12 +381,17 @@ public class CommonConstants {
      */
     public static final String SEGMENT_UPLOAD_START_TIME = "segment.upload.start.time";
 
+    public static final String CRYPTER_NAME = "segment.crypter";
     public static final String CUSTOM_MAP = "custom.map";
 
+    @Deprecated
+    public static final String TABLE_NAME = "segment.table.name";
+
     public static final String SEGMENT_BACKUP_DIR_SUFFIX = ".segment.bak";
     public static final String SEGMENT_TEMP_DIR_SUFFIX = ".segment.tmp";
 
     public static final String LOCAL_SEGMENT_SCHEME = "file";
+    public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
     public static final String METADATA_URI_FOR_PEER_DOWNLOAD = "";
 
     public enum SegmentType {
@@ -405,9 +408,6 @@ public class CommonConstants {
       public static final String HOSTNAME = "$hostName";
       public static final String SEGMENTNAME = "$segmentName";
     }
-
-    @Deprecated
-    public static final String TABLE_NAME = "segment.table.name";
   }
 
   public static class Query {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
index e253c49..fa9725f 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
@@ -40,21 +40,18 @@ public class DateTimeFormatSpecTest {
 
   // Test conversion of a dateTimeColumn value from a format to millis
   @Test(dataProvider = "testFromFormatToMillisDataProvider")
-  public void testFromFormatToMillis(String format, Object timeColumnValue, long millisExpected) {
-
-    DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format);
-    long millisActual = dateTimeFormatSpec.fromFormatToMillis(timeColumnValue);
-    Assert.assertEquals(millisActual, millisExpected);
+  public void testFromFormatToMillis(String format, String formattedValue, long expectedTimeMs) {
+    Assert.assertEquals(new DateTimeFormatSpec(format).fromFormatToMillis(formattedValue), expectedTimeMs);
   }
 
   @DataProvider(name = "testFromFormatToMillisDataProvider")
   public Object[][] provideTestFromFormatToMillisData() {
 
     List<Object[]> entries = new ArrayList<>();
-    entries.add(new Object[]{"1:HOURS:EPOCH", 416359L, 1498892400000L});
-    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, 1498892400000L});
-    entries.add(new Object[]{"1:HOURS:EPOCH", 0L, 0L});
-    entries.add(new Object[]{"5:MINUTES:EPOCH", 4996308L, 1498892400000L});
+    entries.add(new Object[]{"1:HOURS:EPOCH", "416359", 1498892400000L});
+    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", "1498892400000", 1498892400000L});
+    entries.add(new Object[]{"1:HOURS:EPOCH", "0", 0L});
+    entries.add(new Object[]{"5:MINUTES:EPOCH", "4996308", 1498892400000L});
     entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "20170701", DateTimeFormat.forPattern("yyyyMMdd")
         .withZoneUTC().parseMillis("20170701")});
     entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/Chicago)", "20170701", DateTimeFormat
@@ -73,44 +70,37 @@ public class DateTimeFormatSpecTest {
 
   // Test the conversion of a millis value to date time column value in a format
   @Test(dataProvider = "testFromMillisToFormatDataProvider")
-  public void testFromMillisToFormat(String format, long timeColumnValueMS, Class<?> type,
-      Object timeColumnValueExpected) {
-
-    DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format);
-    Object timeColumnValueActual = dateTimeFormatSpec.fromMillisToFormat(timeColumnValueMS, type);
-    Assert.assertEquals(timeColumnValueActual, timeColumnValueExpected);
+  public void testFromMillisToFormat(String format, long timeMs, String expectedFormattedValue) {
+    Assert.assertEquals(new DateTimeFormatSpec(format).fromMillisToFormat(timeMs), expectedFormattedValue);
   }
 
   @DataProvider(name = "testFromMillisToFormatDataProvider")
   public Object[][] provideTestFromMillisToFormatData() {
 
     List<Object[]> entries = new ArrayList<>();
-    entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, Long.class, 416359L});
-    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, Long.class, 1498892400000L});
-    entries.add(new Object[]{"1:HOURS:EPOCH", 0L, Long.class, 0L});
-    entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, Long.class, 4996308L});
-    entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 1498892400000L, String.class, DateTimeFormat
-        .forPattern("yyyyMMdd").withZoneUTC().print(1498892400000L)});
-    entries.add(
-        new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/New_York)", 1498892400000L, String.class, DateTimeFormat
-            .forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print(
-            1498892400000L)});
-    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 1498892400000L, String.class, DateTimeFormat
-        .forPattern("yyyyMMdd HH").withZoneUTC().print(1498892400000L)});
-    entries.add(
-        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 1498892400000L, String.class, DateTimeFormat
-            .forPattern("yyyyMMdd HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print(
-            1498892400000L)});
-    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 1498892400000L, String.class, DateTimeFormat
+    entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, "416359"});
+    entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, "1498892400000"});
+    entries.add(new Object[]{"1:HOURS:EPOCH", 0L, "0"});
+    entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, "4996308"});
+    entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 1498892400000L, DateTimeFormat.forPattern("yyyyMMdd")
+        .withZoneUTC().print(1498892400000L)});
+    entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/New_York)", 1498892400000L, DateTimeFormat
+        .forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print(
+        1498892400000L)});
+    entries.add(
+        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 1498892400000L, DateTimeFormat.forPattern("yyyyMMdd HH")
+            .withZoneUTC().print(1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 1498892400000L, DateTimeFormat
+        .forPattern("yyyyMMdd HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print(
+        1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 1498892400000L, DateTimeFormat
         .forPattern("yyyyMMdd HH Z").withZoneUTC().print(1498892400000L)});
-    entries.add(
-        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z tz(GMT+0500)", 1498892400000L, String.class, DateTimeFormat
-            .forPattern("yyyyMMdd HH Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print(
-            1498892400000L)});
-    entries.add(
-        new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 1498892400000L, String.class, DateTimeFormat
-            .forPattern("M/d/yyyy h:mm:ss a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)});
-    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 1502066750000L, String.class, DateTimeFormat
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z tz(GMT+0500)", 1498892400000L, DateTimeFormat
+        .forPattern("yyyyMMdd HH Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print(
+        1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 1498892400000L, DateTimeFormat
+        .forPattern("M/d/yyyy h:mm:ss a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)});
+    entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 1502066750000L, DateTimeFormat
         .forPattern("M/d/yyyy h a").withZoneUTC().withLocale(Locale.ENGLISH).print(1502066750000L)});
     return entries.toArray(new Object[entries.size()][]);
   }
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
index b505d28..e27c045 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -25,11 +25,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.helix.task.TaskState;
 import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
 import org.apache.pinot.common.utils.CommonConstants.Segment;
 import org.apache.pinot.common.utils.LLCSegmentName;
@@ -171,9 +169,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
       boolean skipGenerate = false;
       for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
         String segmentName = realtimeSegmentZKMetadata.getSegmentName();
-        TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
-        long segmentStartTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
-        long segmentEndTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+        long segmentStartTimeMs = realtimeSegmentZKMetadata.getStartTimeMs();
+        long segmentEndTimeMs = realtimeSegmentZKMetadata.getEndTimeMs();
 
         // Check overlap with window
         if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
@@ -287,21 +284,15 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
       long watermarkMs;
 
       // Find the smallest time from all segments
-      RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+      long minStartTimeMs = Long.MAX_VALUE;
       for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
-        if (minSegmentZkMetadata == null || realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
-            .getStartTime()) {
-          minSegmentZkMetadata = realtimeSegmentZKMetadata;
-        }
+        minStartTimeMs = Math.min(minStartTimeMs, realtimeSegmentZKMetadata.getStartTimeMs());
       }
-      Preconditions.checkState(minSegmentZkMetadata != null);
-
-      // Convert the segment minTime to millis
-      long minSegmentStartTimeMs = minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+      Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
 
       // Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
       // For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window [20200813, 20200814)
-      watermarkMs = (minSegmentStartTimeMs / bucketMs) * bucketMs;
+      watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
 
       // Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above
       realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index 2690d67..ad6d66f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -39,19 +39,12 @@ public class TimeRetentionStrategy implements RetentionStrategy {
 
   @Override
   public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
-    TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
-    if (timeUnit == null) {
-      LOGGER.warn("Time unit is not set for {} segment: {} of table: {}", segmentZKMetadata.getSegmentType(),
-          segmentZKMetadata.getSegmentName(), tableNameWithType);
-      return false;
-    }
-    long endTime = segmentZKMetadata.getEndTime();
-    long endTimeMs = timeUnit.toMillis(endTime);
+    long endTimeMs = segmentZKMetadata.getEndTimeMs();
 
     // Check that the end time is between 1971 and 2071
     if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
-      LOGGER.warn("{} segment: {} of table: {} has invalid end time: {} {}", segmentZKMetadata.getSegmentType(),
-          segmentZKMetadata.getSegmentName(), tableNameWithType, endTime, timeUnit);
+      LOGGER.warn("{} segment: {} of table: {} has invalid end time in millis: {}", segmentZKMetadata.getSegmentType(),
+          segmentZKMetadata.getSegmentName(), tableNameWithType, endTimeMs);
       return false;
     }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
index 455066f..c3d3df3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
@@ -162,18 +162,13 @@ public class TableRetentionValidator {
       List<String> errorMessages = new ArrayList<>();
       for (String segmentName : segmentNames) {
         OfflineSegmentZKMetadata offlineSegmentMetadata = getOfflineSegmentMetadata(tableName, segmentName);
-        TimeUnit segmentTimeUnit = offlineSegmentMetadata.getTimeUnit();
-        if (segmentTimeUnit == null) {
-          errorMessages.add("Segment: " + segmentName + " has null time unit");
-          continue;
+        long startTimeMs = offlineSegmentMetadata.getStartTimeMs();
+        if (!TimeUtils.timeValueInValidRange(startTimeMs)) {
+          errorMessages.add("Segment: " + segmentName + " has invalid start time in millis: " + startTimeMs);
         }
-        long startTimeInMillis = segmentTimeUnit.toMillis(offlineSegmentMetadata.getStartTime());
-        if (!TimeUtils.timeValueInValidRange(startTimeInMillis)) {
-          errorMessages.add("Segment: " + segmentName + " has invalid start time in millis: " + startTimeInMillis);
-        }
-        long endTimeInMillis = segmentTimeUnit.toMillis(offlineSegmentMetadata.getEndTime());
-        if (!TimeUtils.timeValueInValidRange(endTimeInMillis)) {
-          errorMessages.add("Segment: " + segmentName + " has invalid end time in millis: " + endTimeInMillis);
+        long endTimeMs = offlineSegmentMetadata.getEndTimeMs();
+        if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
+          errorMessages.add("Segment: " + segmentName + " has invalid end time in millis: " + endTimeMs);
         }
       }
 
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index dc98600..7e593b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -88,9 +88,10 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
       List<Interval> segmentIntervals = new ArrayList<>(numSegments);
       int numSegmentsWithInvalidIntervals = 0;
       for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
-        Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
-        if (timeInterval != null && TimeUtils.isValidTimeInterval(timeInterval)) {
-          segmentIntervals.add(timeInterval);
+        long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs();
+        long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+        if (TimeUtils.timeValueInValidRange(startTimeMs) && TimeUtils.timeValueInValidRange(endTimeMs)) {
+          segmentIntervals.add(new Interval(startTimeMs, endTimeMs));
         } else {
           numSegmentsWithInvalidIntervals++;
         }
@@ -110,10 +111,9 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
     long maxSegmentPushTime = Long.MIN_VALUE;
 
     for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
-      Interval segmentInterval = offlineSegmentZKMetadata.getTimeInterval();
-
-      if (segmentInterval != null && maxSegmentEndTime < segmentInterval.getEndMillis()) {
-        maxSegmentEndTime = segmentInterval.getEndMillis();
+      long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+      if (TimeUtils.timeValueInValidRange(endTimeMs) && maxSegmentEndTime < endTimeMs) {
+        maxSegmentEndTime = endTimeMs;
       }
 
       long segmentPushTime = offlineSegmentZKMetadata.getPushTime();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index e15d28c..4db0266 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -222,7 +222,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
     assertEquals(committedSegmentZKMetadata.getEndOffset(),
         new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString());
     assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
-    assertEquals(committedSegmentZKMetadata.getTimeInterval(), INTERVAL);
     assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
     assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION);
     assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
index 82cb2fe..b25bd51 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -255,18 +255,12 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
       filterFunction = getFilterFunctionLong(windowStartMs, windowEndMs, timeColumn);
     } else {
       // Convert windowStart and windowEnd to time format of the data
-      if (dateTimeFormatSpec.getTimeFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH)) {
-        long windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs, Long.class);
-        long windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, Long.class);
-        filterFunction = getFilterFunctionLong(windowStart, windowEnd, timeColumn);
+      String windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs);
+      String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs);
+      if (dateTimeFieldSpec.getDataType().isNumeric()) {
+        filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), Long.parseLong(windowEnd), timeColumn);
       } else {
-        String windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs, String.class);
-        String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, String.class);
-        if (dateTimeFieldSpec.getDataType().isNumeric()) {
-          filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), Long.parseLong(windowEnd), timeColumn);
-        } else {
-          filterFunction = getFilterFunctionString(windowStart, windowEnd, timeColumn);
-        }
+        filterFunction = getFilterFunctionString(windowStart, windowEnd, timeColumn);
       }
     }
     return new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
index f760778..6918da2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
@@ -121,61 +121,35 @@ public class DateTimeFormatSpec {
   }
 
   /**
+   * Converts the time in millis to the date time format.
    * <ul>
-   * <li>Given a timestamp in millis, convert it to the given format
-   * This method should not do validation of outputGranularity.
-   * The validation should be handled by caller using {@link #validateFormat}</li>
-   * <ul>
-   * <li>1) given dateTimeColumnValueMS = 1498892400000 and format=1:HOURS:EPOCH,
-   * dateTimeSpec.fromMillis(1498892400000) = 416359 (i.e. dateTimeColumnValueMS/(1000*60*60))</li>
-   * <li>2) given dateTimeColumnValueMS = 1498892400000 and format=5:MINUTES:EPOCH,
-   * dateTimeSpec.fromMillis(1498892400000) = 4996308 (i.e. timeColumnValueMS/(1000*60*5))</li>
-   * <li>3) given dateTimeColumnValueMS = 1498892400000 and
-   * format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd, dateTimeSpec.fromMillis(1498892400000) = 20170701</li>
-   * </ul>
+   *   <li>Given timeMs=1498892400000 and format='1:HOURS:EPOCH', returns 1498892400000/(1000*60*60)='416359'</li>
+   *   <li>Given timeMs=1498892400000 and format='5:MINUTES:EPOCH', returns 1498892400000/(1000*60*5)='4996308'</li>
+   *   <li>Given timeMs=1498892400000 and format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns '20170701'</li>
    * </ul>
-   * @param type - type of return value (can be int/long or string depending on time format)
-   * @return dateTime column value in dateTimeFieldSpec
    */
-  public <T extends Object> T fromMillisToFormat(Long dateTimeColumnValueMS, Class<T> type) {
-    Preconditions.checkNotNull(dateTimeColumnValueMS);
-
-    Object dateTimeColumnValue;
-    if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) {
-      dateTimeColumnValue = _unitSpec.getTimeUnit().convert(dateTimeColumnValueMS, TimeUnit.MILLISECONDS) / _size;
+  public String fromMillisToFormat(long timeMs) {
+    if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) {
+      return Long.toString(_unitSpec.getTimeUnit().convert(timeMs, TimeUnit.MILLISECONDS) / _size);
     } else {
-      dateTimeColumnValue = _patternSpec.getDateTimeFormatter().print(dateTimeColumnValueMS);
+      return _patternSpec.getDateTimeFormatter().print(timeMs);
     }
-    return type.cast(dateTimeColumnValue);
   }
 
   /**
+   * Converts the date time value to the time in millis.
    * <ul>
-   * <li>Convert a time value in a format, to millis.
-   * This method should not do validation of outputGranularity.
-   * The validation should be handled by caller using {@link #validateFormat}</li>
-   * <ul>
-   * <li>1) given dateTimeColumnValue = 416359 and format=1:HOURS:EPOCH
-   * dateTimeSpec.toMillis(416359) = 1498892400000 (i.e. timeColumnValue*60*60*1000)</li>
-   * <li>2) given dateTimeColumnValue = 4996308 and format=5:MINUTES:EPOCH
-   * dateTimeSpec.toMillis(4996308) = 1498892400000 (i.e. timeColumnValue*5*60*1000)</li>
-   * <li>3) given dateTimeColumnValue = 20170701 and format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd
-   * dateTimeSpec.toMillis(20170701) = 1498892400000</li>
+   *   <li>Given dateTimeValue='416359' and format='1:HOURS:EPOCH', returns 416359*(1000*60*60)=1498892400000</li>
+   *   <li>Given dateTimeValue='4996308' and format='5:MINUTES:EPOCH', returns 4996308*(1000*60*5)=1498892400000</li>
+   *   <li>Given dateTimeValue='20170701' and format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns 1498892400000</li>
    * </ul>
-   * <ul>
-   * @param dateTimeColumnValue - datetime Column value to convert to millis
-   * @return datetime value in millis
    */
-  public Long fromFormatToMillis(Object dateTimeColumnValue) {
-    Preconditions.checkNotNull(dateTimeColumnValue);
-
-    long timeColumnValueMS;
-    if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) {
-      timeColumnValueMS = TimeUnit.MILLISECONDS.convert((Long) dateTimeColumnValue * _size, _unitSpec.getTimeUnit());
+  public long fromFormatToMillis(String dateTimeValue) {
+    if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) {
+      return TimeUnit.MILLISECONDS.convert(Long.parseLong(dateTimeValue) * _size, _unitSpec.getTimeUnit());
     } else {
-      timeColumnValueMS = _patternSpec.getDateTimeFormatter().parseMillis(String.valueOf(dateTimeColumnValue));
+      return _patternSpec.getDateTimeFormatter().parseMillis(dateTimeValue);
     }
-    return timeColumnValueMS;
   }
 
   /**
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
index 754fdc7..af34969 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
@@ -35,7 +35,6 @@ import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.utils.TimeUtils;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.apache.pinot.tools.Command;
-import org.joda.time.Interval;
 import org.kohsuke.args4j.Option;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -136,12 +135,13 @@ public class OfflineSegmentIntervalCheckerCommand extends AbstractBaseAdminComma
     List<OfflineSegmentZKMetadata> offlineSegmentZKMetadataList =
         ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, offlineTableName);
 
-    // collect segments with invalid time intervals
+    // Collect segments with invalid start/end time
     List<String> segmentsWithInvalidIntervals = new ArrayList<>();
     if (SegmentIntervalUtils.eligibleForSegmentIntervalCheck(tableConfig.getValidationConfig())) {
       for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
-        Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
-        if (timeInterval == null || !TimeUtils.isValidTimeInterval(timeInterval)) {
+        long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs();
+        long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+        if (!TimeUtils.timeValueInValidRange(startTimeMs) || !TimeUtils.timeValueInValidRange(endTimeMs)) {
           segmentsWithInvalidIntervals.add(offlineSegmentZKMetadata.getSegmentName());
         }
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org