You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/04/05 21:26:42 UTC
[incubator-pinot] branch master updated: Support legacy time type
in SegmentValidationAndRetentionConfig (#4081)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new a2c8d37 Support legacy time type in SegmentValidationAndRetentionConfig (#4081)
a2c8d37 is described below
commit a2c8d37e5aa2ca0aea3564ea4c3ee6fde04939a9
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Apr 5 14:26:36 2019 -0700
Support legacy time type in SegmentValidationAndRetentionConfig (#4081)
Support legacy time type 'daysSinceEpoch', 'hoursSinceEpoch', 'minutesSinceEpoch' and 'secondsSinceEpoch'
Change SegmentsValidationAndRetentionConfig.getTimeType() to directly return TimeUnit instead of String
---
.../HelixExternalViewBasedTimeBoundaryService.java | 42 +--------------
.../SegmentsValidationAndRetentionConfig.java | 16 +++---
.../apache/pinot/common/utils/time/TimeUtils.java | 54 ++++++++++++--------
.../org/apache/pinot/common/utils/UtilsTest.java | 59 ++++++++++++----------
.../api/resources/PinotTableRestletResource.java | 7 +--
.../name/NormalizedDateSegmentNameGenerator.java | 18 ++++---
.../NormalizedDateSegmentNameGeneratorTest.java | 33 ++++++------
.../hadoop/job/mapper/SegmentCreationMapper.java | 5 +-
.../segment/converter/SegmentMergeCommand.java | 11 ++--
9 files changed, 116 insertions(+), 129 deletions(-)
diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
index 759ce91..d6e1adf 100644
--- a/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
+++ b/pinot-broker/src/main/java/org/apache/pinot/broker/routing/HelixExternalViewBasedTimeBoundaryService.java
@@ -32,7 +32,6 @@ import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.common.metadata.segment.OfflineSegmentZKMetadata;
import org.apache.pinot.common.utils.CommonConstants.Helix.TableType;
-import org.apache.pinot.common.utils.time.TimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,11 +40,6 @@ import org.slf4j.LoggerFactory;
public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundaryService {
private static final Logger LOGGER = LoggerFactory.getLogger(HelixExternalViewBasedTimeBoundaryService.class);
- private static final String DAYS_SINCE_EPOCH = "daysSinceEpoch";
- private static final String HOURS_SINCE_EPOCH = "hoursSinceEpoch";
- private static final String MINUTES_SINCE_EPOCH = "minutesSinceEpoch";
- private static final String SECONDS_SINCE_EPOCH = "secondsSinceEpoch";
-
private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final Map<String, TimeBoundaryInfo> _timeBoundaryInfoMap = new ConcurrentHashMap<>();
@@ -71,11 +65,9 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
TableConfig offlineTableConfig = ZKMetadataProvider.getOfflineTableConfig(_propertyStore, tableName);
assert offlineTableConfig != null;
- String timeType = offlineTableConfig.getValidationConfig().getTimeType();
- TimeUnit tableTimeUnit = getTimeUnitFromString(timeType);
+ TimeUnit tableTimeUnit = offlineTableConfig.getValidationConfig().getTimeType();
if (tableTimeUnit == null) {
- LOGGER.info("Skipping updating time boundary service for table '{}' with null timeUnit, config time type: {}.",
- tableName, timeType);
+ LOGGER.info("Skipping updating time boundary service for table '{}' because time unit is not set", tableName);
return;
}
@@ -120,36 +112,6 @@ public class HelixExternalViewBasedTimeBoundaryService implements TimeBoundarySe
return maxTimeValue;
}
- private TimeUnit getTimeUnitFromString(String timeTypeString) {
- // If input data does not have a time column, no need to fire an exception.
- if ((timeTypeString == null) || timeTypeString.isEmpty()) {
- return null;
- }
-
- TimeUnit timeUnit = TimeUtils.timeUnitFromString(timeTypeString);
-
- // Check legacy time formats
- if (timeUnit == null) {
- if (timeTypeString.equalsIgnoreCase(DAYS_SINCE_EPOCH)) {
- timeUnit = TimeUnit.DAYS;
- }
- if (timeTypeString.equalsIgnoreCase(HOURS_SINCE_EPOCH)) {
- timeUnit = TimeUnit.HOURS;
- }
- if (timeTypeString.equalsIgnoreCase(MINUTES_SINCE_EPOCH)) {
- timeUnit = TimeUnit.MINUTES;
- }
- if (timeTypeString.equalsIgnoreCase(SECONDS_SINCE_EPOCH)) {
- timeUnit = TimeUnit.SECONDS;
- }
- }
-
- if (timeUnit == null) {
- throw new RuntimeException("Not supported time type for: " + timeTypeString);
- }
- return timeUnit;
- }
-
@Override
public void remove(String tableName) {
_timeBoundaryInfoMap.remove(tableName);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
index 27366c4..9d244c6 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/config/SegmentsValidationAndRetentionConfig.java
@@ -21,7 +21,9 @@ package org.apache.pinot.common.config;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import java.lang.reflect.Field;
+import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.utils.EqualityUtils;
+import org.apache.pinot.common.utils.time.TimeUtils;
import org.apache.pinot.startree.hll.HllConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +57,7 @@ public class SegmentsValidationAndRetentionConfig {
private String timeColumnName;
@ConfigKey(value = "timeType")
- private String timeType;
+ private TimeUnit _timeType;
@ConfigKey(value = "segmentAssignmentStrategy")
private String segmentAssignmentStrategy;
@@ -86,12 +88,12 @@ public class SegmentsValidationAndRetentionConfig {
this.timeColumnName = timeColumnName;
}
- public String getTimeType() {
- return timeType;
+ public TimeUnit getTimeType() {
+ return _timeType;
}
public void setTimeType(String timeType) {
- this.timeType = timeType;
+ _timeType = timeType != null ? TimeUtils.timeUnitFromString(timeType) : null;
}
public String getRetentionTimeUnit() {
@@ -225,8 +227,8 @@ public class SegmentsValidationAndRetentionConfig {
.isEqual(segmentPushFrequency, that.segmentPushFrequency) && EqualityUtils
.isEqual(segmentPushType, that.segmentPushType) && EqualityUtils.isEqual(replication, that.replication)
&& EqualityUtils.isEqual(schemaName, that.schemaName) && EqualityUtils
- .isEqual(timeColumnName, that.timeColumnName) && EqualityUtils.isEqual(timeType, that.timeType) && EqualityUtils
- .isEqual(segmentAssignmentStrategy, that.segmentAssignmentStrategy) && EqualityUtils
+ .isEqual(timeColumnName, that.timeColumnName) && EqualityUtils.isEqual(_timeType, that._timeType)
+ && EqualityUtils.isEqual(segmentAssignmentStrategy, that.segmentAssignmentStrategy) && EqualityUtils
.isEqual(replicaGroupStrategyConfig, that.replicaGroupStrategyConfig) && EqualityUtils
.isEqual(hllConfig, that.hllConfig) && EqualityUtils.isEqual(replicasPerPartition, that.replicasPerPartition);
}
@@ -240,7 +242,7 @@ public class SegmentsValidationAndRetentionConfig {
result = EqualityUtils.hashCodeOf(result, replication);
result = EqualityUtils.hashCodeOf(result, schemaName);
result = EqualityUtils.hashCodeOf(result, timeColumnName);
- result = EqualityUtils.hashCodeOf(result, timeType);
+ result = EqualityUtils.hashCodeOf(result, _timeType);
result = EqualityUtils.hashCodeOf(result, segmentAssignmentStrategy);
result = EqualityUtils.hashCodeOf(result, replicaGroupStrategyConfig);
result = EqualityUtils.hashCodeOf(result, hllConfig);
diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/time/TimeUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/time/TimeUtils.java
index 76eb66b..6a89292 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/utils/time/TimeUtils.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/time/TimeUtils.java
@@ -30,10 +30,12 @@ import org.joda.time.format.PeriodFormatterBuilder;
public class TimeUtils {
- private static final Map<String, TimeUnit> TIME_UNIT_MAP = new HashMap<>();
+ private static final String UPPER_CASE_DAYS_SINCE_EPOCH = "DAYSSINCEEPOCH";
+ private static final String UPPER_CASE_HOURS_SINCE_EPOCH = "HOURSSINCEEPOCH";
+ private static final String UPPER_CASE_MINUTES_SINCE_EPOCH = "MINUTESSINCEEPOCH";
+ private static final String UPPER_CASE_SECONDS_SINCE_EPOCH = "SECONDSSINCEEPOCH";
- private static final long VALID_MIN_TIME_MILLIS = new DateTime(1971, 1, 1, 0, 0, 0, 0, DateTimeZone.UTC).getMillis();
- private static final long VALID_MAX_TIME_MILLIS = new DateTime(2071, 1, 1, 0, 0, 0, 0, DateTimeZone.UTC).getMillis();
+ private static final Map<String, TimeUnit> TIME_UNIT_MAP = new HashMap<>();
static {
for (TimeUnit timeUnit : TimeUnit.values()) {
@@ -41,29 +43,39 @@ public class TimeUtils {
}
}
- /**
- * Converts timeValue in timeUnitString to milliseconds
- * @param timeUnitString the time unit string to convert, such as DAYS or SECONDS
- * @param timeValue the time value to convert to milliseconds
- * @return corresponding value in milliseconds or LONG.MIN_VALUE if timeUnitString is invalid
- * Returning LONG.MIN_VALUE gives consistent beahvior with the java library
- */
- public static long toMillis(String timeUnitString, String timeValue) {
- TimeUnit timeUnit = timeUnitFromString(timeUnitString);
- return (timeUnit == null) ? Long.MIN_VALUE : timeUnit.toMillis(Long.parseLong(timeValue));
- }
+ private static final long VALID_MIN_TIME_MILLIS = new DateTime(1971, 1, 1, 0, 0, 0, 0, DateTimeZone.UTC).getMillis();
+ private static final long VALID_MAX_TIME_MILLIS = new DateTime(2071, 1, 1, 0, 0, 0, 0, DateTimeZone.UTC).getMillis();
/**
- * Turns a time unit string into a TimeUnit, ignoring case.
+ * Converts a time unit string into {@link TimeUnit}, ignoring case.
+ * <p>Supports the following legacy time unit strings:
+ * <ul>
+ * <li>"daysSinceEpoch" -> DAYS</li>
+ * <li>"hoursSinceEpoch" -> HOURS</li>
+ * <li>"minutesSinceEpoch" -> MINUTES</li>
+ * <li>"secondsSinceEpoch" -> SECONDS</li>
+ * </ul>
*
- * @param timeUnitString The time unit string to convert, such as DAYS or SECONDS.
- * @return The corresponding time unit or null if it doesn't exist
+ * @param timeUnitString The time unit string to convert, e.g. "DAYS" or "SECONDS"
+ * @return The corresponding {@link TimeUnit}
*/
public static TimeUnit timeUnitFromString(String timeUnitString) {
- if (timeUnitString == null) {
- return null;
- } else {
- return TIME_UNIT_MAP.get(timeUnitString.toUpperCase());
+ String upperCaseTimeUnitString = timeUnitString.toUpperCase();
+ TimeUnit timeUnit = TIME_UNIT_MAP.get(upperCaseTimeUnitString);
+ if (timeUnit != null) {
+ return timeUnit;
+ }
+ switch (upperCaseTimeUnitString) {
+ case UPPER_CASE_DAYS_SINCE_EPOCH:
+ return TimeUnit.DAYS;
+ case UPPER_CASE_HOURS_SINCE_EPOCH:
+ return TimeUnit.HOURS;
+ case UPPER_CASE_MINUTES_SINCE_EPOCH:
+ return TimeUnit.MINUTES;
+ case UPPER_CASE_SECONDS_SINCE_EPOCH:
+ return TimeUnit.SECONDS;
+ default:
+ throw new IllegalArgumentException("Unsupported time unit: " + timeUnitString);
}
}
diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java
index 53091fb..8b4ce21 100644
--- a/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java
+++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/UtilsTest.java
@@ -24,48 +24,53 @@ import org.apache.pinot.common.utils.time.TimeUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.testng.Assert.assertEquals;
+
/**
- * Tests for the Utils class.
- *
+ * Tests for the Utils classes.
*/
public class UtilsTest {
+
@Test
public void testToCamelCase() {
- Assert.assertEquals(Utils.toCamelCase("Hello world!"), "HelloWorld");
- Assert.assertEquals(Utils.toCamelCase("blah blah blah"), "blahBlahBlah");
- Assert.assertEquals(Utils.toCamelCase("the quick __--???!!! brown fox?"), "theQuickBrownFox");
+ assertEquals(Utils.toCamelCase("Hello world!"), "HelloWorld");
+ assertEquals(Utils.toCamelCase("blah blah blah"), "blahBlahBlah");
+ assertEquals(Utils.toCamelCase("the quick __--???!!! brown fox?"), "theQuickBrownFox");
}
@Test
public void testTimeUtils() {
- Assert.assertEquals(TimeUtils.timeUnitFromString("days"), TimeUnit.DAYS);
- Assert.assertEquals(TimeUtils.timeUnitFromString("MINUTES"), TimeUnit.MINUTES);
- Assert.assertNull(TimeUtils.timeUnitFromString("daysSinceEpoch"));
- Assert.assertNull(TimeUtils.timeUnitFromString(null));
+ assertEquals(TimeUtils.timeUnitFromString("days"), TimeUnit.DAYS);
+ assertEquals(TimeUtils.timeUnitFromString("HOURS"), TimeUnit.HOURS);
+ assertEquals(TimeUtils.timeUnitFromString("Minutes"), TimeUnit.MINUTES);
+ assertEquals(TimeUtils.timeUnitFromString("SeCoNdS"), TimeUnit.SECONDS);
+ assertEquals(TimeUtils.timeUnitFromString("daysSinceEpoch"), TimeUnit.DAYS);
+ assertEquals(TimeUtils.timeUnitFromString("HOURSSINCEEPOCH"), TimeUnit.HOURS);
+ assertEquals(TimeUtils.timeUnitFromString("MinutesSinceEpoch"), TimeUnit.MINUTES);
+ assertEquals(TimeUtils.timeUnitFromString("SeCoNdSsInCeEpOcH"), TimeUnit.SECONDS);
}
@Test
public void testFlushThresholdTimeConversion() {
-
Long millis = TimeUtils.convertPeriodToMillis("8d");
- Assert.assertEquals(millis.longValue(), 8 * 24 * 60 * 60 * 1000L);
+ assertEquals(millis.longValue(), 8 * 24 * 60 * 60 * 1000L);
millis = TimeUtils.convertPeriodToMillis("8d6h");
- Assert.assertEquals(millis.longValue(), 8 * 24 * 60 * 60 * 1000L + 6 * 60 * 60 * 1000L);
+ assertEquals(millis.longValue(), 8 * 24 * 60 * 60 * 1000L + 6 * 60 * 60 * 1000L);
millis = TimeUtils.convertPeriodToMillis("8d10m");
- Assert.assertEquals(millis.longValue(), 8 * 24 * 60 * 60 * 1000L + 10 * 60 * 1000L);
+ assertEquals(millis.longValue(), 8 * 24 * 60 * 60 * 1000L + 10 * 60 * 1000L);
millis = TimeUtils.convertPeriodToMillis("6h");
- Assert.assertEquals(millis.longValue(), 6 * 60 * 60 * 1000L);
+ assertEquals(millis.longValue(), 6 * 60 * 60 * 1000L);
millis = TimeUtils.convertPeriodToMillis("6h30m");
- Assert.assertEquals(millis.longValue(), 6 * 60 * 60 * 1000L + 30 * 60 * 1000);
+ assertEquals(millis.longValue(), 6 * 60 * 60 * 1000L + 30 * 60 * 1000);
millis = TimeUtils.convertPeriodToMillis("50m");
- Assert.assertEquals(millis.longValue(), 50 * 60 * 1000L);
+ assertEquals(millis.longValue(), 50 * 60 * 1000L);
millis = TimeUtils.convertPeriodToMillis("10s");
- Assert.assertEquals(millis.longValue(), 10 * 1000L);
+ assertEquals(millis.longValue(), 10 * 1000L);
millis = TimeUtils.convertPeriodToMillis(null);
- Assert.assertEquals(millis.longValue(), 0);
+ assertEquals(millis.longValue(), 0);
millis = TimeUtils.convertPeriodToMillis("-1d");
- Assert.assertEquals(millis.longValue(), -86400000L);
+ assertEquals(millis.longValue(), -86400000L);
try {
millis = TimeUtils.convertPeriodToMillis("hhh");
Assert.fail("Expected exception to be thrown while converting an invalid input string");
@@ -74,20 +79,20 @@ public class UtilsTest {
}
String periodStr = TimeUtils.convertMillisToPeriod(10 * 1000L);
- Assert.assertEquals(periodStr, "10s");
+ assertEquals(periodStr, "10s");
periodStr = TimeUtils.convertMillisToPeriod(50 * 60 * 1000L);
- Assert.assertEquals(periodStr, "50m");
+ assertEquals(periodStr, "50m");
periodStr = TimeUtils.convertMillisToPeriod(50 * 60 * 1000L + 30 * 1000);
- Assert.assertEquals(periodStr, "50m30s");
+ assertEquals(periodStr, "50m30s");
periodStr = TimeUtils.convertMillisToPeriod(6 * 60 * 60 * 1000L);
- Assert.assertEquals(periodStr, "6h");
+ assertEquals(periodStr, "6h");
periodStr = TimeUtils.convertMillisToPeriod(6 * 60 * 60 * 1000L + 20 * 60 * 1000 + 10 * 1000);
- Assert.assertEquals(periodStr, "6h20m10s");
+ assertEquals(periodStr, "6h20m10s");
periodStr = TimeUtils.convertMillisToPeriod(0L);
- Assert.assertEquals(periodStr, "0s");
+ assertEquals(periodStr, "0s");
periodStr = TimeUtils.convertMillisToPeriod(-1L);
- Assert.assertEquals(periodStr, "");
+ assertEquals(periodStr, "");
periodStr = TimeUtils.convertMillisToPeriod(null);
- Assert.assertEquals(periodStr, null);
+ assertEquals(periodStr, null);
}
}
diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index a3b45e5..52c0107 100644
--- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -28,6 +28,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.inject.Inject;
@@ -442,9 +443,9 @@ public class PinotTableRestletResource {
existingTimeColumnName, newTimeColumnName));
}
- String newTimeColumnType = newSegmentConfig.getTimeType();
- String existingTimeColumnType = SegmentConfigToCompare.getTimeType();
- if (!existingTimeColumnType.equalsIgnoreCase(newTimeColumnType)) {
+ TimeUnit existingTimeColumnType = SegmentConfigToCompare.getTimeType();
+ TimeUnit newTimeColumnType = newSegmentConfig.getTimeType();
+ if (existingTimeColumnType != newTimeColumnType) {
throw new PinotHelixResourceManager.InvalidTableConfigException(String
.format("Time column types are different! Existing time column type: %s. New time column type: %s",
existingTimeColumnType, newTimeColumnType));
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index cc80054..eadd3d9 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -33,8 +33,8 @@ import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
*/
public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator {
private final String _segmentNamePrefix;
- private final boolean _appendPushType;
private final boolean _excludeSequenceId;
+ private final boolean _appendPushType;
// For APPEND tables
private final SimpleDateFormat _outputSDF;
@@ -44,11 +44,11 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
private final SimpleDateFormat _inputSDF;
public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix,
- @Nullable String excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency,
- @Nullable String timeType, @Nullable String timeFormat) {
+ boolean excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency, @Nullable TimeUnit timeType,
+ @Nullable String timeFormat) {
_segmentNamePrefix = segmentNamePrefix != null ? segmentNamePrefix.trim() : tableName;
+ _excludeSequenceId = excludeSequenceId;
_appendPushType = "APPEND".equalsIgnoreCase(pushType);
- _excludeSequenceId = Boolean.parseBoolean(excludeSequenceId);
// Include time info for APPEND push type
if (_appendPushType) {
@@ -62,7 +62,7 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
// Parse input time format: 'EPOCH' or 'SIMPLE_DATE_FORMAT:<pattern>'
if (Preconditions.checkNotNull(timeFormat).equals(TimeFormat.EPOCH.toString())) {
- _inputTimeUnit = TimeUnit.valueOf(timeType);
+ _inputTimeUnit = timeType;
_inputSDF = null;
} else {
Preconditions.checkArgument(timeFormat.startsWith(TimeFormat.SIMPLE_DATE_FORMAT.toString()),
@@ -82,11 +82,13 @@ public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator
@Override
public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
Integer sequenceIdInSegmentName = !_excludeSequenceId && sequenceId >= 0 ? sequenceId : null;
- if (!_appendPushType) {
- return JOINER.join(_segmentNamePrefix, sequenceIdInSegmentName);
- } else {
+
+ // Include time value for APPEND push type
+ if (_appendPushType) {
return JOINER.join(_segmentNamePrefix, getNormalizedDate(Preconditions.checkNotNull(minTimeValue)),
getNormalizedDate(Preconditions.checkNotNull(maxTimeValue)), sequenceIdInSegmentName);
+ } else {
+ return JOINER.join(_segmentNamePrefix, sequenceIdInSegmentName);
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
index 52bfcab..9706648 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.core.segment.name;
+import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -28,8 +29,6 @@ public class NormalizedDateSegmentNameGeneratorTest {
private static final String SEGMENT_NAME_PREFIX = "myTable_daily";
private static final String APPEND_PUSH_TYPE = "APPEND";
private static final String REFRESH_PUSH_TYPE = "REFRESH";
- private static final String DAYS_TIME_TYPE = "DAYS";
- private static final String HOURS_TIME_TYPE = "HOURS";
private static final String EPOCH_TIME_FORMAT = "EPOCH";
private static final String LONG_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
private static final String STRING_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyy-MM-dd";
@@ -41,7 +40,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testRefresh() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, REFRESH_PUSH_TYPE, null, null, null);
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, REFRESH_PUSH_TYPE, null, null, null);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
@@ -51,7 +50,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testWithSegmentNamePrefix() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, null, REFRESH_PUSH_TYPE, null, null,
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, false, REFRESH_PUSH_TYPE, null, null,
null);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
@@ -62,7 +61,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testWithUntrimmedSegmentNamePrefix() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX + " ", null, REFRESH_PUSH_TYPE, null,
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX + " ", false, REFRESH_PUSH_TYPE, null,
null, null);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
@@ -73,7 +72,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testExcludeSequenceId() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, "true", REFRESH_PUSH_TYPE, null, null, null);
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, true, REFRESH_PUSH_TYPE, null, null, null);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false, excludeSequenceId=true");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
@@ -83,7 +82,7 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testWithPrefixExcludeSequenceId() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, "true", REFRESH_PUSH_TYPE, null, null,
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, true, REFRESH_PUSH_TYPE, null, null,
null);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false, excludeSequenceId=true");
@@ -94,8 +93,8 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testAppend() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
- DAYS_TIME_TYPE, EPOCH_TIME_FORMAT);
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ TimeUnit.DAYS, EPOCH_TIME_FORMAT);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=DAYS");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
@@ -107,8 +106,8 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testHoursTimeType() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
- HOURS_TIME_TYPE, EPOCH_TIME_FORMAT);
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ TimeUnit.HOURS, EPOCH_TIME_FORMAT);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=HOURS");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 24L, 72L),
@@ -120,8 +119,8 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testLongSimpleDateFormat() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
- DAYS_TIME_TYPE, LONG_SIMPLE_DATE_FORMAT);
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ TimeUnit.DAYS, LONG_SIMPLE_DATE_FORMAT);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyyMMdd");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 19700102L, 19700104L),
@@ -133,8 +132,8 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testStringSimpleDateFormat() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
- DAYS_TIME_TYPE, STRING_SIMPLE_DATE_FORMAT);
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ TimeUnit.DAYS, STRING_SIMPLE_DATE_FORMAT);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyy-MM-dd");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, "1970-01-02", "1970-01-04"),
@@ -146,8 +145,8 @@ public class NormalizedDateSegmentNameGeneratorTest {
@Test
public void testHourlyPushFrequency() {
SegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, HOURLY_PUSH_FREQUENCY,
- DAYS_TIME_TYPE, EPOCH_TIME_FORMAT);
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, false, APPEND_PUSH_TYPE, HOURLY_PUSH_FREQUENCY,
+ TimeUnit.DAYS, EPOCH_TIME_FORMAT);
assertEquals(segmentNameGenerator.toString(),
"NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd-HH, inputTimeUnit=DAYS");
assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index 240a766..2fe3537 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -122,8 +122,9 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
}
_segmentNameGenerator =
new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
- _jobConf.get(JobConfigConstants.EXCLUDE_SEQUENCE_ID), validationConfig.getSegmentPushType(),
- validationConfig.getSegmentPushFrequency(), validationConfig.getTimeType(), timeFormat);
+ _jobConf.getBoolean(JobConfigConstants.EXCLUDE_SEQUENCE_ID, false),
+ validationConfig.getSegmentPushType(), validationConfig.getSegmentPushFrequency(),
+ validationConfig.getTimeType(), timeFormat);
break;
default:
throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
index 8a86a7a..d12c2b8 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -26,7 +26,9 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.config.TableNameBuilder;
import org.apache.pinot.common.data.Schema;
@@ -238,14 +240,15 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
String tableName = tableConfig.getTableName();
// Fetch time related configurations from schema and table config.
- String pushFrequency = tableConfig.getValidationConfig().getSegmentPushFrequency();
- String timeType = tableConfig.getValidationConfig().getTimeType();
- String pushType = tableConfig.getValidationConfig().getSegmentPushType();
+ SegmentsValidationAndRetentionConfig validationConfig = tableConfig.getValidationConfig();
+ String pushFrequency = validationConfig.getSegmentPushFrequency();
+ TimeUnit timeType = validationConfig.getTimeType();
+ String pushType = validationConfig.getSegmentPushType();
String timeFormat = schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat();
// Generate the final segment name using segment name generator
NormalizedDateSegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(tableName, null, null, pushType, pushFrequency, timeType, timeFormat);
+ new NormalizedDateSegmentNameGenerator(tableName, null, false, pushType, pushFrequency, timeType, timeFormat);
return segmentNameGenerator.generateSegmentName(DEFAULT_SEQUENCE_ID, minStartTime, maxEndTime);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org