You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2020/11/10 22:06:42 UTC
[incubator-pinot] branch master updated: Always read start/end time
in millis from the segment ZK metadata (#6239)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 750af31 Always read start/end time in millis from the segment ZK metadata (#6239)
750af31 is described below
commit 750af31133f758f3c13cda7d9b22126eb7d52512
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Tue Nov 10 14:05:12 2020 -0800
Always read start/end time in millis from the segment ZK metadata (#6239)
- Adds the APIs to read the start/end time in millis from the SegmentZKMetadata, deprecates the old getters.
- Modifies the logic in TimeBoundaryManager to support SDF for the hybrid table.
---
.../routing/timeboundary/TimeBoundaryManager.java | 96 +++----
.../common/metadata/segment/SegmentZKMetadata.java | 291 ++++++++++-----------
.../common/tier/TimeBasedTierSegmentSelector.java | 7 +-
.../apache/pinot/common/utils/CommonConstants.java | 12 +-
.../pinot/common/data/DateTimeFormatSpecTest.java | 70 +++--
.../RealtimeToOfflineSegmentsTaskGenerator.java | 21 +-
.../retention/strategy/TimeRetentionStrategy.java | 13 +-
.../controller/util/TableRetentionValidator.java | 17 +-
.../validation/OfflineSegmentIntervalChecker.java | 14 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 1 -
.../RealtimeToOfflineSegmentsTaskExecutor.java | 16 +-
.../apache/pinot/spi/data/DateTimeFormatSpec.java | 58 ++--
.../OfflineSegmentIntervalCheckerCommand.java | 8 +-
13 files changed, 268 insertions(+), 356 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
index c7a64bb..4a8c357 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/timeboundary/TimeBoundaryManager.java
@@ -48,15 +48,15 @@ import org.slf4j.LoggerFactory;
*/
public class TimeBoundaryManager {
private static final Logger LOGGER = LoggerFactory.getLogger(TimeBoundaryManager.class);
- private static final long INVALID_END_TIME = -1;
+ private static final long INVALID_END_TIME_MS = -1;
private final String _offlineTableName;
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final String _segmentZKMetadataPathPrefix;
private final String _timeColumn;
- private final TimeUnit _timeUnit;
- private final boolean _isHourlyTable;
- private final Map<String, Long> _endTimeMap = new HashMap<>();
+ private final DateTimeFormatSpec _timeFormatSpec;
+ private final long _timeOffsetMs;
+ private final Map<String, Long> _endTimeMsMap = new HashMap<>();
private volatile TimeBoundaryInfo _timeBoundaryInfo;
@@ -75,19 +75,20 @@ public class TimeBoundaryManager {
DateTimeFieldSpec dateTimeSpec = schema.getSpecForTimeColumn(_timeColumn);
Preconditions.checkNotNull(dateTimeSpec, "Field spec must be specified in schema for time column: %s of table: %s",
_timeColumn, _offlineTableName);
- DateTimeFormatSpec formatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat());
- _timeUnit = formatSpec.getColumnUnit();
- Preconditions
- .checkNotNull(_timeUnit, "Time unit must be configured in the field spec for time column: %s of table: %s",
- _timeColumn, _offlineTableName);
+ _timeFormatSpec = new DateTimeFormatSpec(dateTimeSpec.getFormat());
+ Preconditions.checkNotNull(_timeFormatSpec.getColumnUnit(),
+ "Time unit must be configured in the field spec for time column: %s of table: %s", _timeColumn,
+ _offlineTableName);
// For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary; otherwise, use
// (maxEndTime - 1 DAY)
- _isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
- .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency()) && _timeUnit != TimeUnit.DAYS;
+ boolean isHourlyTable = CommonConstants.Table.PUSH_FREQUENCY_HOURLY
+ .equalsIgnoreCase(tableConfig.getValidationConfig().getSegmentPushFrequency())
+ && _timeFormatSpec.getColumnUnit() != TimeUnit.DAYS;
+ _timeOffsetMs = isHourlyTable ? TimeUnit.HOURS.toMillis(1) : TimeUnit.DAYS.toMillis(1);
- LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeUnit: {}, isHourlyTable: {} for table: {}",
- _timeColumn, _timeUnit, _isHourlyTable, _offlineTableName);
+ LOGGER.info("Constructed TimeBoundaryManager with timeColumn: {}, timeFormat: {}, isHourlyTable: {} for table: {}",
+ _timeColumn, _timeFormatSpec.getFormat(), isHourlyTable, _offlineTableName);
}
/**
@@ -108,38 +109,38 @@ public class TimeBoundaryManager {
segmentZKMetadataPaths.add(_segmentZKMetadataPathPrefix + segment);
}
List<ZNRecord> znRecords = _propertyStore.get(segmentZKMetadataPaths, null, AccessOption.PERSISTENT);
- long maxEndTime = INVALID_END_TIME;
+ long maxEndTimeMs = INVALID_END_TIME_MS;
for (int i = 0; i < numSegments; i++) {
String segment = segments.get(i);
- long endTime = extractEndTimeFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
- _endTimeMap.put(segment, endTime);
- maxEndTime = Math.max(maxEndTime, endTime);
+ long endTimeMs = extractEndTimeMsFromSegmentZKMetadataZNRecord(segment, znRecords.get(i));
+ _endTimeMsMap.put(segment, endTimeMs);
+ maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs);
}
- updateTimeBoundaryInfo(maxEndTime);
+ updateTimeBoundaryInfo(maxEndTimeMs);
}
- private long extractEndTimeFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) {
+ private long extractEndTimeMsFromSegmentZKMetadataZNRecord(String segment, @Nullable ZNRecord znRecord) {
if (znRecord == null) {
LOGGER.warn("Failed to find segment ZK metadata for segment: {}, table: {}", segment, _offlineTableName);
- return INVALID_END_TIME;
+ return INVALID_END_TIME_MS;
}
- long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, INVALID_END_TIME);
- if (endTime <= 0) {
+ long endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
+ if (endTime > 0) {
+ TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
+ return timeUnit.toMillis(endTime);
+ } else {
LOGGER.warn("Failed to find valid end time for segment: {}, table: {}", segment, _offlineTableName);
- return INVALID_END_TIME;
+ return INVALID_END_TIME_MS;
}
-
- TimeUnit timeUnit = znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
- return _timeUnit.convert(endTime, timeUnit);
}
- private void updateTimeBoundaryInfo(long maxEndTime) {
- if (maxEndTime > 0) {
- long timeBoundary = getTimeBoundary(maxEndTime);
+ private void updateTimeBoundaryInfo(long maxEndTimeMs) {
+ if (maxEndTimeMs > 0) {
+ String timeBoundary = _timeFormatSpec.fromMillisToFormat(maxEndTimeMs - _timeOffsetMs);
TimeBoundaryInfo currentTimeBoundaryInfo = _timeBoundaryInfo;
- if (currentTimeBoundaryInfo == null || Long.parseLong(currentTimeBoundaryInfo.getTimeValue()) != timeBoundary) {
- _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, Long.toString(timeBoundary));
+ if (currentTimeBoundaryInfo == null || !currentTimeBoundaryInfo.getTimeValue().equals(timeBoundary)) {
+ _timeBoundaryInfo = new TimeBoundaryInfo(_timeColumn, timeBoundary);
LOGGER.info("Updated time boundary to: {} for table: {}", timeBoundary, _offlineTableName);
}
} else {
@@ -150,19 +151,6 @@ public class TimeBoundaryManager {
}
/**
- * Returns the time boundary based on the given maximum end time.
- * <p>NOTE: For HOURLY table with time unit other than DAYS, use (maxEndTime - 1 HOUR) as the time boundary;
- * otherwise, use (maxEndTime - 1 DAY).
- */
- private long getTimeBoundary(long maxEndTime) {
- if (_isHourlyTable) {
- return maxEndTime - _timeUnit.convert(1L, TimeUnit.HOURS);
- } else {
- return maxEndTime - _timeUnit.convert(1L, TimeUnit.DAYS);
- }
- }
-
- /**
* Processes the external view change based on the given ideal state and online segments (segments with
* ONLINE/CONSUMING instances in the ideal state and selected by the pre-selector).
* <p>NOTE: We don't update all the segment ZK metadata for every external view change, but only the new added/removed
@@ -174,28 +162,28 @@ public class TimeBoundaryManager {
public synchronized void onExternalViewChange(ExternalView externalView, IdealState idealState,
Set<String> onlineSegments) {
for (String segment : onlineSegments) {
- _endTimeMap.computeIfAbsent(segment, k -> extractEndTimeFromSegmentZKMetadataZNRecord(segment,
+ _endTimeMsMap.computeIfAbsent(segment, k -> extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
_propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
}
- _endTimeMap.keySet().retainAll(onlineSegments);
- updateTimeBoundaryInfo(getMaxEndTime());
+ _endTimeMsMap.keySet().retainAll(onlineSegments);
+ updateTimeBoundaryInfo(getMaxEndTimeMs());
}
- private long getMaxEndTime() {
- long maxEndTime = INVALID_END_TIME;
- for (long endTime : _endTimeMap.values()) {
- maxEndTime = Math.max(maxEndTime, endTime);
+ private long getMaxEndTimeMs() {
+ long maxEndTimeMs = INVALID_END_TIME_MS;
+ for (long endTimeMs : _endTimeMsMap.values()) {
+ maxEndTimeMs = Math.max(maxEndTimeMs, endTimeMs);
}
- return maxEndTime;
+ return maxEndTimeMs;
}
/**
* Refreshes the metadata for the given segment (called when segment is getting refreshed).
*/
public synchronized void refreshSegment(String segment) {
- _endTimeMap.put(segment, extractEndTimeFromSegmentZKMetadataZNRecord(segment,
+ _endTimeMsMap.put(segment, extractEndTimeMsFromSegmentZKMetadataZNRecord(segment,
_propertyStore.get(_segmentZKMetadataPathPrefix + segment, null, AccessOption.PERSISTENT)));
- updateTimeBoundaryInfo(getMaxEndTime());
+ updateTimeBoundaryInfo(getMaxEndTimeMs());
}
@Nullable
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
index f0f00ad..5ee5288 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/metadata/segment/SegmentZKMetadata.java
@@ -22,11 +22,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import java.util.Objects;
import java.util.concurrent.TimeUnit;
-import javax.annotation.Nonnull;
import org.apache.helix.ZNRecord;
import org.apache.pinot.common.metadata.ZKMetadata;
-import org.apache.pinot.common.utils.CommonConstants;
+import org.apache.pinot.common.utils.CommonConstants.Segment;
import org.apache.pinot.common.utils.CommonConstants.Segment.SegmentType;
import org.apache.pinot.spi.utils.JsonUtils;
import org.joda.time.Duration;
@@ -34,11 +34,6 @@ import org.joda.time.Interval;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.pinot.spi.utils.EqualityUtils.hashCodeOf;
-import static org.apache.pinot.spi.utils.EqualityUtils.isEqual;
-import static org.apache.pinot.spi.utils.EqualityUtils.isNullOrNotSameClass;
-import static org.apache.pinot.spi.utils.EqualityUtils.isSameReference;
-
public abstract class SegmentZKMetadata implements ZKMetadata {
private static final Logger LOGGER = LoggerFactory.getLogger(SegmentZKMetadata.class);
@@ -50,16 +45,14 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
private long _startTime = -1;
private long _endTime = -1;
private TimeUnit _timeUnit;
- private Duration _timeGranularity;
- private Interval _timeInterval;
private String _indexVersion;
private long _totalDocs = -1;
private long _crc = -1;
private long _creationTime = -1;
private SegmentPartitionMetadata _partitionMetadata;
private long _segmentUploadStartTime = -1;
- private Map<String, String> _customMap;
private String _crypterName;
+ private Map<String, String> _customMap;
@Deprecated
private String _tableName;
@@ -68,23 +61,20 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
}
public SegmentZKMetadata(ZNRecord znRecord) {
- _segmentName = znRecord.getSimpleField(CommonConstants.Segment.SEGMENT_NAME);
- _tableName = znRecord.getSimpleField(CommonConstants.Segment.TABLE_NAME);
- _crypterName = znRecord.getSimpleField(CommonConstants.Segment.CRYPTER_NAME);
- _segmentType = znRecord.getEnumField(CommonConstants.Segment.SEGMENT_TYPE, SegmentType.class, SegmentType.OFFLINE);
- _startTime = znRecord.getLongField(CommonConstants.Segment.START_TIME, -1);
- _endTime = znRecord.getLongField(CommonConstants.Segment.END_TIME, -1);
- if (znRecord.getSimpleFields().containsKey(CommonConstants.Segment.TIME_UNIT) && !znRecord
- .getSimpleField(CommonConstants.Segment.TIME_UNIT).equals(NULL)) {
- setTimeUnit(znRecord.getEnumField(CommonConstants.Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS));
+ _segmentName = znRecord.getSimpleField(Segment.SEGMENT_NAME);
+ _segmentType = znRecord.getEnumField(Segment.SEGMENT_TYPE, SegmentType.class, SegmentType.OFFLINE);
+ _startTime = znRecord.getLongField(Segment.START_TIME, -1);
+ _endTime = znRecord.getLongField(Segment.END_TIME, -1);
+ String timeUnitString = znRecord.getSimpleField(Segment.TIME_UNIT);
+ if (timeUnitString != null && !timeUnitString.equals(NULL)) {
+ _timeUnit = znRecord.getEnumField(Segment.TIME_UNIT, TimeUnit.class, TimeUnit.DAYS);
}
- _indexVersion = znRecord.getSimpleField(CommonConstants.Segment.INDEX_VERSION);
- _totalDocs = znRecord.getLongField(CommonConstants.Segment.TOTAL_DOCS, -1);
- _crc = znRecord.getLongField(CommonConstants.Segment.CRC, -1);
- _creationTime = znRecord.getLongField(CommonConstants.Segment.CREATION_TIME, -1);
-
+ _indexVersion = znRecord.getSimpleField(Segment.INDEX_VERSION);
+ _totalDocs = znRecord.getLongField(Segment.TOTAL_DOCS, -1);
+ _crc = znRecord.getLongField(Segment.CRC, -1);
+ _creationTime = znRecord.getLongField(Segment.CREATION_TIME, -1);
try {
- String partitionMetadataJson = znRecord.getSimpleField(CommonConstants.Segment.PARTITION_METADATA);
+ String partitionMetadataJson = znRecord.getSimpleField(Segment.PARTITION_METADATA);
if (partitionMetadataJson != null) {
_partitionMetadata = SegmentPartitionMetadata.fromJsonString(partitionMetadataJson);
}
@@ -93,8 +83,12 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
"Exception caught while reading partition info from zk metadata for segment '{}', partition info dropped.",
_segmentName, e);
}
- _segmentUploadStartTime = znRecord.getLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, -1);
- _customMap = znRecord.getMapField(CommonConstants.Segment.CUSTOM_MAP);
+ _segmentUploadStartTime = znRecord.getLongField(Segment.SEGMENT_UPLOAD_START_TIME, -1);
+ _crypterName = znRecord.getSimpleField(Segment.CRYPTER_NAME);
+ _customMap = znRecord.getMapField(Segment.CUSTOM_MAP);
+
+ // For backward-compatibility
+ _tableName = znRecord.getSimpleField(Segment.TABLE_NAME);
}
public String getSegmentName() {
@@ -105,54 +99,40 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
_segmentName = segmentName;
}
- @Deprecated
- public String getTableName() {
- return _tableName;
+ public SegmentType getSegmentType() {
+ return _segmentType;
}
- @Deprecated
- public void setTableName(String tableName) {
- _tableName = tableName;
+ public void setSegmentType(SegmentType segmentType) {
+ _segmentType = segmentType;
}
- public long getStartTime() {
- return _startTime;
+ public long getStartTimeMs() {
+ if (_startTime > 0 && _timeUnit != null) {
+ return _timeUnit.toMillis(_startTime);
+ } else {
+ return -1;
+ }
}
public void setStartTime(long startTime) {
_startTime = startTime;
}
- public long getEndTime() {
- return _endTime;
+ public long getEndTimeMs() {
+ if (_endTime > 0 && _timeUnit != null) {
+ return _timeUnit.toMillis(_endTime);
+ } else {
+ return -1;
+ }
}
public void setEndTime(long endTime) {
_endTime = endTime;
}
- public TimeUnit getTimeUnit() {
- return _timeUnit;
- }
-
- /**
- * NOTE: should be called after setting start and end time.
- */
- public void setTimeUnit(@Nonnull TimeUnit timeUnit) {
+ public void setTimeUnit(TimeUnit timeUnit) {
_timeUnit = timeUnit;
- _timeGranularity = new Duration(_timeUnit.toMillis(1));
- // For consuming segment, end time might not be set
- if (_startTime >= 0 && _startTime <= _endTime) {
- _timeInterval = new Interval(_timeUnit.toMillis(_startTime), _timeUnit.toMillis(_endTime));
- }
- }
-
- public Duration getTimeGranularity() {
- return _timeGranularity;
- }
-
- public Interval getTimeInterval() {
- return _timeInterval;
}
public String getIndexVersion() {
@@ -163,22 +143,6 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
_indexVersion = indexVersion;
}
- public SegmentType getSegmentType() {
- return _segmentType;
- }
-
- public void setSegmentType(SegmentType segmentType) {
- _segmentType = segmentType;
- }
-
- public String getCrypterName() {
- return _crypterName;
- }
-
- public void setCrypterName(String crypterName) {
- _crypterName = crypterName;
- }
-
public long getTotalDocs() {
return _totalDocs;
}
@@ -219,6 +183,14 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
_segmentUploadStartTime = segmentUploadStartTime;
}
+ public String getCrypterName() {
+ return _crypterName;
+ }
+
+ public void setCrypterName(String crypterName) {
+ _crypterName = crypterName;
+ }
+
public Map<String, String> getCustomMap() {
return _customMap;
}
@@ -227,76 +199,87 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
_customMap = customMap;
}
+ @Deprecated
+ public String getTableName() {
+ return _tableName;
+ }
+
+ @Deprecated
+ public void setTableName(String tableName) {
+ _tableName = tableName;
+ }
+
+ @Deprecated
+ public long getStartTime() {
+ return _startTime;
+ }
+
+ @Deprecated
+ public long getEndTime() {
+ return _endTime;
+ }
+
+ @Deprecated
+ public TimeUnit getTimeUnit() {
+ return _timeUnit;
+ }
+
+ @Deprecated
+ public Duration getTimeGranularity() {
+ return _timeUnit != null ? new Duration(_timeUnit.toMillis(1)) : null;
+ }
+
+ @Deprecated
+ public Interval getTimeInterval() {
+ if (_startTime > 0 && _startTime <= _endTime && _timeUnit != null) {
+ return new Interval(_timeUnit.toMillis(_startTime), _timeUnit.toMillis(_endTime));
+ } else {
+ return null;
+ }
+ }
+
@Override
- public boolean equals(Object segmentMetadata) {
- if (isSameReference(this, segmentMetadata)) {
+ public boolean equals(Object o) {
+ if (this == o) {
return true;
}
-
- if (isNullOrNotSameClass(this, segmentMetadata)) {
+ if (o == null || getClass() != o.getClass()) {
return false;
}
-
- SegmentZKMetadata metadata = (SegmentZKMetadata) segmentMetadata;
- return isEqual(_segmentName, metadata._segmentName) && isEqual(_crypterName, metadata._crypterName) && isEqual(
- _tableName, metadata._tableName) && isEqual(_indexVersion, metadata._indexVersion) && isEqual(_timeUnit,
- metadata._timeUnit) && isEqual(_startTime, metadata._startTime) && isEqual(_endTime, metadata._endTime)
- && isEqual(_segmentType, metadata._segmentType) && isEqual(_totalDocs, metadata._totalDocs) && isEqual(_crc,
- metadata._crc) && isEqual(_creationTime, metadata._creationTime) && isEqual(_partitionMetadata,
- metadata._partitionMetadata) && isEqual(_segmentUploadStartTime, metadata._segmentUploadStartTime) && isEqual(
- _customMap, metadata._customMap);
+ SegmentZKMetadata that = (SegmentZKMetadata) o;
+ return _startTime == that._startTime && _endTime == that._endTime && _totalDocs == that._totalDocs
+ && _crc == that._crc && _creationTime == that._creationTime
+ && _segmentUploadStartTime == that._segmentUploadStartTime && Objects.equals(_segmentName, that._segmentName)
+ && _segmentType == that._segmentType && _timeUnit == that._timeUnit && Objects
+ .equals(_indexVersion, that._indexVersion) && Objects.equals(_partitionMetadata, that._partitionMetadata)
+ && Objects.equals(_crypterName, that._crypterName) && Objects.equals(_customMap, that._customMap) && Objects
+ .equals(_tableName, that._tableName);
}
@Override
public int hashCode() {
- int result = hashCodeOf(_segmentName);
- result = hashCodeOf(result, _tableName);
- result = hashCodeOf(result, _crypterName);
- result = hashCodeOf(result, _segmentType);
- result = hashCodeOf(result, _startTime);
- result = hashCodeOf(result, _endTime);
- result = hashCodeOf(result, _timeUnit);
- result = hashCodeOf(result, _indexVersion);
- result = hashCodeOf(result, _totalDocs);
- result = hashCodeOf(result, _crc);
- result = hashCodeOf(result, _creationTime);
- result = hashCodeOf(result, _partitionMetadata);
- result = hashCodeOf(result, _segmentUploadStartTime);
- result = hashCodeOf(result, _customMap);
- return result;
+ return Objects.hash(_segmentName, _segmentType, _startTime, _endTime, _timeUnit, _indexVersion, _totalDocs, _crc,
+ _creationTime, _partitionMetadata, _segmentUploadStartTime, _crypterName, _customMap, _tableName);
}
@Override
public ZNRecord toZNRecord() {
ZNRecord znRecord = new ZNRecord(_segmentName);
- znRecord.setSimpleField(CommonConstants.Segment.SEGMENT_NAME, _segmentName);
-
- if (_tableName != null) {
- znRecord.setSimpleField(CommonConstants.Segment.TABLE_NAME, _tableName);
- }
- if (_crypterName != null) {
- znRecord.setSimpleField(CommonConstants.Segment.CRYPTER_NAME, _crypterName);
- }
-
- znRecord.setEnumField(CommonConstants.Segment.SEGMENT_TYPE, _segmentType);
- if (_timeUnit == null) {
- znRecord.setSimpleField(CommonConstants.Segment.TIME_UNIT, NULL);
- } else {
- znRecord.setEnumField(CommonConstants.Segment.TIME_UNIT, _timeUnit);
- }
- znRecord.setLongField(CommonConstants.Segment.START_TIME, _startTime);
- znRecord.setLongField(CommonConstants.Segment.END_TIME, _endTime);
-
- znRecord.setSimpleField(CommonConstants.Segment.INDEX_VERSION, _indexVersion);
- znRecord.setLongField(CommonConstants.Segment.TOTAL_DOCS, _totalDocs);
- znRecord.setLongField(CommonConstants.Segment.CRC, _crc);
- znRecord.setLongField(CommonConstants.Segment.CREATION_TIME, _creationTime);
+ znRecord.setSimpleField(Segment.SEGMENT_NAME, _segmentName);
+ znRecord.setEnumField(Segment.SEGMENT_TYPE, _segmentType);
+ znRecord.setLongField(Segment.START_TIME, _startTime);
+ znRecord.setLongField(Segment.END_TIME, _endTime);
+ znRecord.setSimpleField(Segment.TIME_UNIT, _timeUnit != null ? _timeUnit.name() : NULL);
+ znRecord.setSimpleField(Segment.INDEX_VERSION, _indexVersion);
+ znRecord.setLongField(Segment.TOTAL_DOCS, _totalDocs);
+ znRecord.setLongField(Segment.CRC, _crc);
+ znRecord.setLongField(Segment.CREATION_TIME, _creationTime);
if (_partitionMetadata != null) {
try {
String partitionMetadataJson = _partitionMetadata.toJsonString();
- znRecord.setSimpleField(CommonConstants.Segment.PARTITION_METADATA, partitionMetadataJson);
+ znRecord.setSimpleField(Segment.PARTITION_METADATA, partitionMetadataJson);
} catch (IOException e) {
LOGGER
.error("Exception caught while writing partition metadata into ZNRecord for segment '{}', will be dropped",
@@ -304,59 +287,65 @@ public abstract class SegmentZKMetadata implements ZKMetadata {
}
}
if (_segmentUploadStartTime > 0) {
- znRecord.setLongField(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, _segmentUploadStartTime);
+ znRecord.setLongField(Segment.SEGMENT_UPLOAD_START_TIME, _segmentUploadStartTime);
+ }
+ if (_crypterName != null) {
+ znRecord.setSimpleField(Segment.CRYPTER_NAME, _crypterName);
}
if (_customMap != null) {
- znRecord.setMapField(CommonConstants.Segment.CUSTOM_MAP, _customMap);
+ znRecord.setMapField(Segment.CUSTOM_MAP, _customMap);
+ }
+
+ // For backward-compatibility
+ if (_tableName != null) {
+ znRecord.setSimpleField(Segment.TABLE_NAME, _tableName);
}
+
return znRecord;
}
public Map<String, String> toMap() {
Map<String, String> configMap = new HashMap<>();
- configMap.put(CommonConstants.Segment.SEGMENT_NAME, _segmentName);
- if (_tableName != null) {
- configMap.put(CommonConstants.Segment.TABLE_NAME, _tableName);
- }
- configMap.put(CommonConstants.Segment.SEGMENT_TYPE, _segmentType.toString());
- if (_timeUnit == null) {
- configMap.put(CommonConstants.Segment.TIME_UNIT, null);
- } else {
- configMap.put(CommonConstants.Segment.TIME_UNIT, _timeUnit.toString());
- }
- configMap.put(CommonConstants.Segment.START_TIME, Long.toString(_startTime));
- configMap.put(CommonConstants.Segment.END_TIME, Long.toString(_endTime));
- configMap.put(CommonConstants.Segment.INDEX_VERSION, _indexVersion);
- configMap.put(CommonConstants.Segment.TOTAL_DOCS, Long.toString(_totalDocs));
- configMap.put(CommonConstants.Segment.CRC, Long.toString(_crc));
- configMap.put(CommonConstants.Segment.CREATION_TIME, Long.toString(_creationTime));
+ configMap.put(Segment.SEGMENT_NAME, _segmentName);
+ configMap.put(Segment.SEGMENT_TYPE, _segmentType.toString());
+ configMap.put(Segment.START_TIME, Long.toString(_startTime));
+ configMap.put(Segment.END_TIME, Long.toString(_endTime));
+ configMap.put(Segment.TIME_UNIT, _timeUnit != null ? _timeUnit.name() : null);
+ configMap.put(Segment.INDEX_VERSION, _indexVersion);
+ configMap.put(Segment.TOTAL_DOCS, Long.toString(_totalDocs));
+ configMap.put(Segment.CRC, Long.toString(_crc));
+ configMap.put(Segment.CREATION_TIME, Long.toString(_creationTime));
if (_partitionMetadata != null) {
try {
String partitionMetadataJson = _partitionMetadata.toJsonString();
- configMap.put(CommonConstants.Segment.PARTITION_METADATA, partitionMetadataJson);
+ configMap.put(Segment.PARTITION_METADATA, partitionMetadataJson);
} catch (IOException e) {
LOGGER.error(
"Exception caught while converting partition metadata into JSON string for segment '{}', will be dropped",
_segmentName, e);
}
}
-
if (_segmentUploadStartTime > 0) {
- configMap.put(CommonConstants.Segment.SEGMENT_UPLOAD_START_TIME, Long.toString(_segmentUploadStartTime));
+ configMap.put(Segment.SEGMENT_UPLOAD_START_TIME, Long.toString(_segmentUploadStartTime));
}
-
- if (_customMap == null) {
- configMap.put(CommonConstants.Segment.CUSTOM_MAP, null);
- } else {
+ if (_crypterName != null) {
+ configMap.put(Segment.CRYPTER_NAME, _crypterName);
+ }
+ if (_customMap != null) {
try {
- configMap.put(CommonConstants.Segment.CUSTOM_MAP, JsonUtils.objectToString(_customMap));
+ configMap.put(Segment.CUSTOM_MAP, JsonUtils.objectToString(_customMap));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
+ // For backward-compatibility
+ if (_tableName != null) {
+ configMap.put(Segment.TABLE_NAME, _tableName);
+ }
+
return configMap;
}
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
index f4b201c..3023bbf 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/tier/TimeBasedTierSegmentSelector.java
@@ -19,7 +19,6 @@
package org.apache.pinot.common.tier;
import com.google.common.base.Preconditions;
-import java.util.concurrent.TimeUnit;
import org.apache.helix.HelixManager;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
@@ -66,10 +65,10 @@ public class TimeBasedTierSegmentSelector implements TierSegmentSelector {
tableNameWithType);
// get segment end time to decide if segment gets selected
- TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
+ long endTimeMs = segmentZKMetadata.getEndTimeMs();
Preconditions
- .checkNotNull(timeUnit, "Time unit is not set for segment: %s of table: %s", segmentName, tableNameWithType);
- long endTimeMs = timeUnit.toMillis(segmentZKMetadata.getEndTime());
+ .checkState(endTimeMs > 0, "Invalid endTimeMs: %s for segment: %s of table: %s", endTimeMs, segmentName,
+ tableNameWithType);
long now = System.currentTimeMillis();
return (now - endTimeMs) > _segmentAgeMillis;
}
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
index e709b15..afb1852 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/CommonConstants.java
@@ -364,18 +364,16 @@ public class CommonConstants {
public static final String SEGMENT_NAME = "segment.name";
public static final String SEGMENT_TYPE = "segment.type";
- public static final String CRYPTER_NAME = "segment.crypter";
- public static final String INDEX_VERSION = "segment.index.version";
public static final String START_TIME = "segment.start.time";
public static final String END_TIME = "segment.end.time";
public static final String TIME_UNIT = "segment.time.unit";
+ public static final String INDEX_VERSION = "segment.index.version";
public static final String TOTAL_DOCS = "segment.total.docs";
public static final String CRC = "segment.crc";
public static final String CREATION_TIME = "segment.creation.time";
public static final String FLUSH_THRESHOLD_SIZE = "segment.flush.threshold.size";
public static final String FLUSH_THRESHOLD_TIME = "segment.flush.threshold.time";
public static final String PARTITION_METADATA = "segment.partition.metadata";
- public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
/**
* This field is used for parallel push protection to lock the segment globally.
* We put the segment upload start timestamp so that if the previous push failed without unlock the segment, the
@@ -383,12 +381,17 @@ public class CommonConstants {
*/
public static final String SEGMENT_UPLOAD_START_TIME = "segment.upload.start.time";
+ public static final String CRYPTER_NAME = "segment.crypter";
public static final String CUSTOM_MAP = "custom.map";
+ @Deprecated
+ public static final String TABLE_NAME = "segment.table.name";
+
public static final String SEGMENT_BACKUP_DIR_SUFFIX = ".segment.bak";
public static final String SEGMENT_TEMP_DIR_SUFFIX = ".segment.tmp";
public static final String LOCAL_SEGMENT_SCHEME = "file";
+ public static final String PEER_SEGMENT_DOWNLOAD_SCHEME = "peer://";
public static final String METADATA_URI_FOR_PEER_DOWNLOAD = "";
public enum SegmentType {
@@ -405,9 +408,6 @@ public class CommonConstants {
public static final String HOSTNAME = "$hostName";
public static final String SEGMENTNAME = "$segmentName";
}
-
- @Deprecated
- public static final String TABLE_NAME = "segment.table.name";
}
public static class Query {
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
index e253c49..fa9725f 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/data/DateTimeFormatSpecTest.java
@@ -40,21 +40,18 @@ public class DateTimeFormatSpecTest {
// Test conversion of a dateTimeColumn value from a format to millis
@Test(dataProvider = "testFromFormatToMillisDataProvider")
- public void testFromFormatToMillis(String format, Object timeColumnValue, long millisExpected) {
-
- DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format);
- long millisActual = dateTimeFormatSpec.fromFormatToMillis(timeColumnValue);
- Assert.assertEquals(millisActual, millisExpected);
+ public void testFromFormatToMillis(String format, String formattedValue, long expectedTimeMs) {
+ Assert.assertEquals(new DateTimeFormatSpec(format).fromFormatToMillis(formattedValue), expectedTimeMs);
}
@DataProvider(name = "testFromFormatToMillisDataProvider")
public Object[][] provideTestFromFormatToMillisData() {
List<Object[]> entries = new ArrayList<>();
- entries.add(new Object[]{"1:HOURS:EPOCH", 416359L, 1498892400000L});
- entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, 1498892400000L});
- entries.add(new Object[]{"1:HOURS:EPOCH", 0L, 0L});
- entries.add(new Object[]{"5:MINUTES:EPOCH", 4996308L, 1498892400000L});
+ entries.add(new Object[]{"1:HOURS:EPOCH", "416359", 1498892400000L});
+ entries.add(new Object[]{"1:MILLISECONDS:EPOCH", "1498892400000", 1498892400000L});
+ entries.add(new Object[]{"1:HOURS:EPOCH", "0", 0L});
+ entries.add(new Object[]{"5:MINUTES:EPOCH", "4996308", 1498892400000L});
entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", "20170701", DateTimeFormat.forPattern("yyyyMMdd")
.withZoneUTC().parseMillis("20170701")});
entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/Chicago)", "20170701", DateTimeFormat
@@ -73,44 +70,37 @@ public class DateTimeFormatSpecTest {
// Test the conversion of a millis value to date time column value in a format
@Test(dataProvider = "testFromMillisToFormatDataProvider")
- public void testFromMillisToFormat(String format, long timeColumnValueMS, Class<?> type,
- Object timeColumnValueExpected) {
-
- DateTimeFormatSpec dateTimeFormatSpec = new DateTimeFormatSpec(format);
- Object timeColumnValueActual = dateTimeFormatSpec.fromMillisToFormat(timeColumnValueMS, type);
- Assert.assertEquals(timeColumnValueActual, timeColumnValueExpected);
+ public void testFromMillisToFormat(String format, long timeMs, String expectedFormattedValue) {
+ Assert.assertEquals(new DateTimeFormatSpec(format).fromMillisToFormat(timeMs), expectedFormattedValue);
}
@DataProvider(name = "testFromMillisToFormatDataProvider")
public Object[][] provideTestFromMillisToFormatData() {
List<Object[]> entries = new ArrayList<>();
- entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, Long.class, 416359L});
- entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, Long.class, 1498892400000L});
- entries.add(new Object[]{"1:HOURS:EPOCH", 0L, Long.class, 0L});
- entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, Long.class, 4996308L});
- entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 1498892400000L, String.class, DateTimeFormat
- .forPattern("yyyyMMdd").withZoneUTC().print(1498892400000L)});
- entries.add(
- new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/New_York)", 1498892400000L, String.class, DateTimeFormat
- .forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print(
- 1498892400000L)});
- entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 1498892400000L, String.class, DateTimeFormat
- .forPattern("yyyyMMdd HH").withZoneUTC().print(1498892400000L)});
- entries.add(
- new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 1498892400000L, String.class, DateTimeFormat
- .forPattern("yyyyMMdd HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print(
- 1498892400000L)});
- entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 1498892400000L, String.class, DateTimeFormat
+ entries.add(new Object[]{"1:HOURS:EPOCH", 1498892400000L, "416359"});
+ entries.add(new Object[]{"1:MILLISECONDS:EPOCH", 1498892400000L, "1498892400000"});
+ entries.add(new Object[]{"1:HOURS:EPOCH", 0L, "0"});
+ entries.add(new Object[]{"5:MINUTES:EPOCH", 1498892400000L, "4996308"});
+ entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd", 1498892400000L, DateTimeFormat.forPattern("yyyyMMdd")
+ .withZoneUTC().print(1498892400000L)});
+ entries.add(new Object[]{"1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd tz(America/New_York)", 1498892400000L, DateTimeFormat
+ .forPattern("yyyyMMdd").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("America/New_York"))).print(
+ 1498892400000L)});
+ entries.add(
+ new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH", 1498892400000L, DateTimeFormat.forPattern("yyyyMMdd HH")
+ .withZoneUTC().print(1498892400000L)});
+ entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH tz(IST)", 1498892400000L, DateTimeFormat
+ .forPattern("yyyyMMdd HH").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("IST"))).print(
+ 1498892400000L)});
+ entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z", 1498892400000L, DateTimeFormat
.forPattern("yyyyMMdd HH Z").withZoneUTC().print(1498892400000L)});
- entries.add(
- new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z tz(GMT+0500)", 1498892400000L, String.class, DateTimeFormat
- .forPattern("yyyyMMdd HH Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print(
- 1498892400000L)});
- entries.add(
- new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 1498892400000L, String.class, DateTimeFormat
- .forPattern("M/d/yyyy h:mm:ss a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)});
- entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 1502066750000L, String.class, DateTimeFormat
+ entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:yyyyMMdd HH Z tz(GMT+0500)", 1498892400000L, DateTimeFormat
+ .forPattern("yyyyMMdd HH Z").withZone(DateTimeZone.forTimeZone(TimeZone.getTimeZone("GMT+0500"))).print(
+ 1498892400000L)});
+ entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h:mm:ss a", 1498892400000L, DateTimeFormat
+ .forPattern("M/d/yyyy h:mm:ss a").withZoneUTC().withLocale(Locale.ENGLISH).print(1498892400000L)});
+ entries.add(new Object[]{"1:HOURS:SIMPLE_DATE_FORMAT:M/d/yyyy h a", 1502066750000L, DateTimeFormat
.forPattern("M/d/yyyy h a").withZoneUTC().withLocale(Locale.ENGLISH).print(1502066750000L)});
return entries.toArray(new Object[entries.size()][]);
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
index b505d28..e27c045 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/minion/generator/RealtimeToOfflineSegmentsTaskGenerator.java
@@ -25,11 +25,9 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.task.TaskState;
import org.apache.pinot.common.metadata.segment.LLCRealtimeSegmentZKMetadata;
-import org.apache.pinot.common.metadata.segment.RealtimeSegmentZKMetadata;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.common.utils.CommonConstants.Segment;
import org.apache.pinot.common.utils.LLCSegmentName;
@@ -171,9 +169,8 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
boolean skipGenerate = false;
for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
String segmentName = realtimeSegmentZKMetadata.getSegmentName();
- TimeUnit timeUnit = realtimeSegmentZKMetadata.getTimeUnit();
- long segmentStartTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getStartTime());
- long segmentEndTimeMs = timeUnit.toMillis(realtimeSegmentZKMetadata.getEndTime());
+ long segmentStartTimeMs = realtimeSegmentZKMetadata.getStartTimeMs();
+ long segmentEndTimeMs = realtimeSegmentZKMetadata.getEndTimeMs();
// Check overlap with window
if (windowStartMs <= segmentEndTimeMs && segmentStartTimeMs < windowEndMs) {
@@ -287,21 +284,15 @@ public class RealtimeToOfflineSegmentsTaskGenerator implements PinotTaskGenerato
long watermarkMs;
// Find the smallest time from all segments
- RealtimeSegmentZKMetadata minSegmentZkMetadata = null;
+ long minStartTimeMs = Long.MAX_VALUE;
for (LLCRealtimeSegmentZKMetadata realtimeSegmentZKMetadata : completedSegmentsMetadata) {
- if (minSegmentZkMetadata == null || realtimeSegmentZKMetadata.getStartTime() < minSegmentZkMetadata
- .getStartTime()) {
- minSegmentZkMetadata = realtimeSegmentZKMetadata;
- }
+ minStartTimeMs = Math.min(minStartTimeMs, realtimeSegmentZKMetadata.getStartTimeMs());
}
- Preconditions.checkState(minSegmentZkMetadata != null);
-
- // Convert the segment minTime to millis
- long minSegmentStartTimeMs = minSegmentZkMetadata.getTimeUnit().toMillis(minSegmentZkMetadata.getStartTime());
+ Preconditions.checkState(minStartTimeMs != Long.MAX_VALUE);
// Round off according to the bucket. This ensures we align the offline segments to proper time boundaries
// For example, if start time millis is 20200813T12:34:59, we want to create the first segment for window [20200813, 20200814)
- watermarkMs = (minSegmentStartTimeMs / bucketMs) * bucketMs;
+ watermarkMs = (minStartTimeMs / bucketMs) * bucketMs;
// Create RealtimeToOfflineSegmentsTaskMetadata ZNode using watermark calculated above
realtimeToOfflineSegmentsTaskMetadata = new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, watermarkMs);
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
index 2690d67..ad6d66f 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/retention/strategy/TimeRetentionStrategy.java
@@ -39,19 +39,12 @@ public class TimeRetentionStrategy implements RetentionStrategy {
@Override
public boolean isPurgeable(String tableNameWithType, SegmentZKMetadata segmentZKMetadata) {
- TimeUnit timeUnit = segmentZKMetadata.getTimeUnit();
- if (timeUnit == null) {
- LOGGER.warn("Time unit is not set for {} segment: {} of table: {}", segmentZKMetadata.getSegmentType(),
- segmentZKMetadata.getSegmentName(), tableNameWithType);
- return false;
- }
- long endTime = segmentZKMetadata.getEndTime();
- long endTimeMs = timeUnit.toMillis(endTime);
+ long endTimeMs = segmentZKMetadata.getEndTimeMs();
// Check that the end time is between 1971 and 2071
if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
- LOGGER.warn("{} segment: {} of table: {} has invalid end time: {} {}", segmentZKMetadata.getSegmentType(),
- segmentZKMetadata.getSegmentName(), tableNameWithType, endTime, timeUnit);
+ LOGGER.warn("{} segment: {} of table: {} has invalid end time in millis: {}", segmentZKMetadata.getSegmentType(),
+ segmentZKMetadata.getSegmentName(), tableNameWithType, endTimeMs);
return false;
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
index 455066f..c3d3df3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableRetentionValidator.java
@@ -162,18 +162,13 @@ public class TableRetentionValidator {
List<String> errorMessages = new ArrayList<>();
for (String segmentName : segmentNames) {
OfflineSegmentZKMetadata offlineSegmentMetadata = getOfflineSegmentMetadata(tableName, segmentName);
- TimeUnit segmentTimeUnit = offlineSegmentMetadata.getTimeUnit();
- if (segmentTimeUnit == null) {
- errorMessages.add("Segment: " + segmentName + " has null time unit");
- continue;
+ long startTimeMs = offlineSegmentMetadata.getStartTimeMs();
+ if (!TimeUtils.timeValueInValidRange(startTimeMs)) {
+ errorMessages.add("Segment: " + segmentName + " has invalid start time in millis: " + startTimeMs);
}
- long startTimeInMillis = segmentTimeUnit.toMillis(offlineSegmentMetadata.getStartTime());
- if (!TimeUtils.timeValueInValidRange(startTimeInMillis)) {
- errorMessages.add("Segment: " + segmentName + " has invalid start time in millis: " + startTimeInMillis);
- }
- long endTimeInMillis = segmentTimeUnit.toMillis(offlineSegmentMetadata.getEndTime());
- if (!TimeUtils.timeValueInValidRange(endTimeInMillis)) {
- errorMessages.add("Segment: " + segmentName + " has invalid end time in millis: " + endTimeInMillis);
+ long endTimeMs = offlineSegmentMetadata.getEndTimeMs();
+ if (!TimeUtils.timeValueInValidRange(endTimeMs)) {
+ errorMessages.add("Segment: " + segmentName + " has invalid end time in millis: " + endTimeMs);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
index dc98600..7e593b3 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/validation/OfflineSegmentIntervalChecker.java
@@ -88,9 +88,10 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
List<Interval> segmentIntervals = new ArrayList<>(numSegments);
int numSegmentsWithInvalidIntervals = 0;
for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
- Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
- if (timeInterval != null && TimeUtils.isValidTimeInterval(timeInterval)) {
- segmentIntervals.add(timeInterval);
+ long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs();
+ long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+ if (TimeUtils.timeValueInValidRange(startTimeMs) && TimeUtils.timeValueInValidRange(endTimeMs)) {
+ segmentIntervals.add(new Interval(startTimeMs, endTimeMs));
} else {
numSegmentsWithInvalidIntervals++;
}
@@ -110,10 +111,9 @@ public class OfflineSegmentIntervalChecker extends ControllerPeriodicTask<Void>
long maxSegmentPushTime = Long.MIN_VALUE;
for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
- Interval segmentInterval = offlineSegmentZKMetadata.getTimeInterval();
-
- if (segmentInterval != null && maxSegmentEndTime < segmentInterval.getEndMillis()) {
- maxSegmentEndTime = segmentInterval.getEndMillis();
+ long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+ if (TimeUtils.timeValueInValidRange(endTimeMs) && maxSegmentEndTime < endTimeMs) {
+ maxSegmentEndTime = endTimeMs;
}
long segmentPushTime = offlineSegmentZKMetadata.getPushTime();
diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index e15d28c..4db0266 100644
--- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -222,7 +222,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertEquals(committedSegmentZKMetadata.getEndOffset(),
new LongMsgOffset(PARTITION_OFFSET.getOffset() + NUM_DOCS).toString());
assertEquals(committedSegmentZKMetadata.getCreationTime(), CURRENT_TIME_MS);
- assertEquals(committedSegmentZKMetadata.getTimeInterval(), INTERVAL);
assertEquals(committedSegmentZKMetadata.getCrc(), Long.parseLong(CRC));
assertEquals(committedSegmentZKMetadata.getIndexVersion(), SEGMENT_VERSION);
assertEquals(committedSegmentZKMetadata.getTotalDocs(), NUM_DOCS);
diff --git a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
index 82cb2fe..b25bd51 100644
--- a/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-minion/src/main/java/org/apache/pinot/minion/executor/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -255,18 +255,12 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
filterFunction = getFilterFunctionLong(windowStartMs, windowEndMs, timeColumn);
} else {
// Convert windowStart and windowEnd to time format of the data
- if (dateTimeFormatSpec.getTimeFormat().equals(DateTimeFieldSpec.TimeFormat.EPOCH)) {
- long windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs, Long.class);
- long windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, Long.class);
- filterFunction = getFilterFunctionLong(windowStart, windowEnd, timeColumn);
+ String windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs);
+ String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs);
+ if (dateTimeFieldSpec.getDataType().isNumeric()) {
+ filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), Long.parseLong(windowEnd), timeColumn);
} else {
- String windowStart = dateTimeFormatSpec.fromMillisToFormat(windowStartMs, String.class);
- String windowEnd = dateTimeFormatSpec.fromMillisToFormat(windowEndMs, String.class);
- if (dateTimeFieldSpec.getDataType().isNumeric()) {
- filterFunction = getFilterFunctionLong(Long.parseLong(windowStart), Long.parseLong(windowEnd), timeColumn);
- } else {
- filterFunction = getFilterFunctionString(windowStart, windowEnd, timeColumn);
- }
+ filterFunction = getFilterFunctionString(windowStart, windowEnd, timeColumn);
}
}
return new RecordFilterConfig.Builder().setRecordFilterType(RecordFilterFactory.RecordFilterType.FILTER_FUNCTION)
diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
index f760778..6918da2 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/DateTimeFormatSpec.java
@@ -121,61 +121,35 @@ public class DateTimeFormatSpec {
}
/**
+ * Converts the time in millis to the date time format.
* <ul>
- * <li>Given a timestamp in millis, convert it to the given format
- * This method should not do validation of outputGranularity.
- * The validation should be handled by caller using {@link #validateFormat}</li>
- * <ul>
- * <li>1) given dateTimeColumnValueMS = 1498892400000 and format=1:HOURS:EPOCH,
- * dateTimeSpec.fromMillis(1498892400000) = 416359 (i.e. dateTimeColumnValueMS/(1000*60*60))</li>
- * <li>2) given dateTimeColumnValueMS = 1498892400000 and format=5:MINUTES:EPOCH,
- * dateTimeSpec.fromMillis(1498892400000) = 4996308 (i.e. timeColumnValueMS/(1000*60*5))</li>
- * <li>3) given dateTimeColumnValueMS = 1498892400000 and
- * format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd, dateTimeSpec.fromMillis(1498892400000) = 20170701</li>
- * </ul>
+ * <li>Given timeMs=1498892400000 and format='1:HOURS:EPOCH', returns 1498892400000/(1000*60*60)='416359'</li>
+ * <li>Given timeMs=1498892400000 and format='5:MINUTES:EPOCH', returns 1498892400000/(1000*60*5)='4996308'</li>
+ * <li>Given timeMs=1498892400000 and format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns '20170701'</li>
* </ul>
- * @param type - type of return value (can be int/long or string depending on time format)
- * @return dateTime column value in dateTimeFieldSpec
*/
- public <T extends Object> T fromMillisToFormat(Long dateTimeColumnValueMS, Class<T> type) {
- Preconditions.checkNotNull(dateTimeColumnValueMS);
-
- Object dateTimeColumnValue;
- if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) {
- dateTimeColumnValue = _unitSpec.getTimeUnit().convert(dateTimeColumnValueMS, TimeUnit.MILLISECONDS) / _size;
+ public String fromMillisToFormat(long timeMs) {
+ if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) {
+ return Long.toString(_unitSpec.getTimeUnit().convert(timeMs, TimeUnit.MILLISECONDS) / _size);
} else {
- dateTimeColumnValue = _patternSpec.getDateTimeFormatter().print(dateTimeColumnValueMS);
+ return _patternSpec.getDateTimeFormatter().print(timeMs);
}
- return type.cast(dateTimeColumnValue);
}
/**
+ * Converts the date time value to the time in millis.
* <ul>
- * <li>Convert a time value in a format, to millis.
- * This method should not do validation of outputGranularity.
- * The validation should be handled by caller using {@link #validateFormat}</li>
- * <ul>
- * <li>1) given dateTimeColumnValue = 416359 and format=1:HOURS:EPOCH
- * dateTimeSpec.toMillis(416359) = 1498892400000 (i.e. timeColumnValue*60*60*1000)</li>
- * <li>2) given dateTimeColumnValue = 4996308 and format=5:MINUTES:EPOCH
- * dateTimeSpec.toMillis(4996308) = 1498892400000 (i.e. timeColumnValue*5*60*1000)</li>
- * <li>3) given dateTimeColumnValue = 20170701 and format=1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd
- * dateTimeSpec.toMillis(20170701) = 1498892400000</li>
+ * <li>Given dateTimeValue='416359' and format='1:HOURS:EPOCH', returns 416359*(1000*60*60)=1498892400000</li>
+ * <li>Given dateTimeValue='4996308' and format='5:MINUTES:EPOCH', returns 4996308*(1000*60*5)=1498892400000</li>
+ * <li>Given dateTimeValue='20170701' and format='1:DAYS:SIMPLE_DATE_FORMAT:yyyyMMdd', returns 1498892400000</li>
* </ul>
- * <ul>
- * @param dateTimeColumnValue - datetime Column value to convert to millis
- * @return datetime value in millis
*/
- public Long fromFormatToMillis(Object dateTimeColumnValue) {
- Preconditions.checkNotNull(dateTimeColumnValue);
-
- long timeColumnValueMS;
- if (_patternSpec.getTimeFormat().equals(TimeFormat.EPOCH)) {
- timeColumnValueMS = TimeUnit.MILLISECONDS.convert((Long) dateTimeColumnValue * _size, _unitSpec.getTimeUnit());
+ public long fromFormatToMillis(String dateTimeValue) {
+ if (_patternSpec.getTimeFormat() == TimeFormat.EPOCH) {
+ return TimeUnit.MILLISECONDS.convert(Long.parseLong(dateTimeValue) * _size, _unitSpec.getTimeUnit());
} else {
- timeColumnValueMS = _patternSpec.getDateTimeFormatter().parseMillis(String.valueOf(dateTimeColumnValue));
+ return _patternSpec.getDateTimeFormatter().parseMillis(dateTimeValue);
}
- return timeColumnValueMS;
}
/**
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
index 754fdc7..af34969 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/OfflineSegmentIntervalCheckerCommand.java
@@ -35,7 +35,6 @@ import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.TimeUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.tools.Command;
-import org.joda.time.Interval;
import org.kohsuke.args4j.Option;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -136,12 +135,13 @@ public class OfflineSegmentIntervalCheckerCommand extends AbstractBaseAdminComma
List<OfflineSegmentZKMetadata> offlineSegmentZKMetadataList =
ZKMetadataProvider.getOfflineSegmentZKMetadataListForTable(_propertyStore, offlineTableName);
- // collect segments with invalid time intervals
+ // Collect segments with invalid start/end time
List<String> segmentsWithInvalidIntervals = new ArrayList<>();
if (SegmentIntervalUtils.eligibleForSegmentIntervalCheck(tableConfig.getValidationConfig())) {
for (OfflineSegmentZKMetadata offlineSegmentZKMetadata : offlineSegmentZKMetadataList) {
- Interval timeInterval = offlineSegmentZKMetadata.getTimeInterval();
- if (timeInterval == null || !TimeUtils.isValidTimeInterval(timeInterval)) {
+ long startTimeMs = offlineSegmentZKMetadata.getStartTimeMs();
+ long endTimeMs = offlineSegmentZKMetadata.getEndTimeMs();
+ if (!TimeUtils.timeValueInValidRange(startTimeMs) || !TimeUtils.timeValueInValidRange(endTimeMs)) {
segmentsWithInvalidIntervals.add(offlineSegmentZKMetadata.getSegmentName());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org