You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/14 02:26:57 UTC
[incubator-pinot] branch master updated: Refactor
SegmentNameGenerators and integrate them into Hadoop (#3821)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d3fad80 Refactor SegmentNameGenerators and integrate them into Hadoop (#3821)
d3fad80 is described below
commit d3fad80d02be7a4a099db2a581d57bb73beaa0ab
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 13 18:26:52 2019 -0800
Refactor SegmentNameGenerators and integrate them into Hadoop (#3821)
- generateSegmentName() now take 3 arguments: sequenceId, minTimeValue, maxTimeValue
- 3 segment name generator implementations:
- FixedSegmentNameGenerator: generate fixed segment name
- SimpleSegmentNameGenerator: generate segment name without time value conversion
- NormalizedDateSegmentNameGenerator: generate segment name with normalized date in human readable format
- Integrate SimpleSegmentNameGenerator and NormalizedDateSegmentNameGenerator into Hadoop job
---
.../generator/SegmentGeneratorConfig.java | 8 +-
.../impl/SegmentIndexCreationDriverImpl.java | 10 +-
.../segment/name/DefaultSegmentNameGenerator.java | 146 ---------------
...nerator.java => FixedSegmentNameGenerator.java} | 23 ++-
.../name/NormalizedDateSegmentNameGenerator.java | 190 ++++++++------------
.../core/segment/name/SegmentNameGenerator.java | 18 +-
.../segment/name/SimpleSegmentNameGenerator.java | 59 ++++++
.../name/DefaultSegmentNameGeneratorTest.java | 163 -----------------
.../NormalizedDateSegmentNameGeneratorTest.java | 198 +++++++++++----------
.../name/SimpleSegmentNameGeneratorTest.java | 58 ++++++
.../pinot/hadoop/job/JobConfigConstants.java | 16 +-
.../hadoop/job/mapper/SegmentCreationMapper.java | 39 +++-
.../converter/ColumnarToStarTreeConverter.java | 16 +-
.../segment/converter/SegmentMergeCommand.java | 13 +-
14 files changed, 397 insertions(+), 560 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 61cfbac..40024dc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -51,8 +51,9 @@ import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
import org.apache.pinot.core.data.readers.FileFormat;
import org.apache.pinot.core.data.readers.RecordReaderConfig;
import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
-import org.apache.pinot.core.segment.name.DefaultSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
import org.apache.pinot.core.util.AvroUtils;
import org.apache.pinot.startree.hll.HllConfig;
@@ -544,10 +545,9 @@ public class SegmentGeneratorConfig {
return _segmentNameGenerator;
}
if (_segmentName != null) {
- return new DefaultSegmentNameGenerator(_segmentName);
+ return new FixedSegmentNameGenerator(_segmentName);
}
- return new DefaultSegmentNameGenerator(getTimeColumnName(), getTableName(), getSegmentNamePostfix(),
- getSequenceId());
+ return new SimpleSegmentNameGenerator(_tableName, _segmentNamePostfix);
}
public void setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index c13a686..645165d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -358,8 +358,14 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
private void handlePostCreation()
throws Exception {
- final String timeColumn = config.getTimeColumnName();
- segmentName = config.getSegmentNameGenerator().generateSegmentName(segmentStats.getColumnProfileFor(timeColumn));
+ ColumnStatistics timeColumnStatistics = segmentStats.getColumnProfileFor(config.getTimeColumnName());
+ int sequenceId = config.getSequenceId();
+ if (timeColumnStatistics != null) {
+ segmentName = config.getSegmentNameGenerator()
+ .generateSegmentName(sequenceId, timeColumnStatistics.getMinValue(), timeColumnStatistics.getMaxValue());
+ } else {
+ segmentName = config.getSegmentNameGenerator().generateSegmentName(sequenceId, null, null);
+ }
try {
// Write the index files to disk
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGenerator.java
deleted file mode 100644
index ab627a1..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGenerator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.name;
-
-import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
-
-
-public class DefaultSegmentNameGenerator implements SegmentNameGenerator {
- private final String _segmentName;
- private final String _timeColumnName;
- private final String _tableName;
- private final String _segmentNamePostfix;
- private final int _sequenceId;
-
- /**
- * To be used when segment name is pre-decided externally
- * @param segmentName
- */
- public DefaultSegmentNameGenerator(final String segmentName) {
- _segmentName = segmentName;
- _tableName = null;
- _timeColumnName = null;
- _segmentNamePostfix = null;
- _sequenceId = -1;
- }
-
- /**
- * To be used to derive segmentName
- * @param timeColumnName
- * @param tableName
- * @param segmentNamePostfix
- */
- public DefaultSegmentNameGenerator(String timeColumnName, String tableName, String segmentNamePostfix,
- int sequenceId) {
- _timeColumnName = timeColumnName;
- _tableName = tableName;
- _segmentNamePostfix = segmentNamePostfix;
- _segmentName = null;
- _sequenceId = sequenceId;
- }
-
- /**
- *
- * The sequenceId is used for numbering segments while the postfix is used to append a string to the segment name.
- *
- * Some examples:
- *
- * If the time column name exists, _segmentNamePostfix = "postfix", and _sequenceId = 1, the segment name would be
- * tableName_minDate_maxDate_postfix_1
- *
- * If there is no time column, _segmentNamePostfix = "postfix" and _sequenceId = 1, the segment name would be
- * tableName_postfix_1
- *
- * If there is no time column, no postfix, and no sequence id, the segment name would be
- * tableName
- *
- * If there is no time column, a postfix, and no sequence id, the segment name would be
- * tableName_postfix
- *
- * If there is no time column, no postfix, and a sequence id, the segment name would be
- * tableName_sequenceId
- *
- * @param statsCollector
- * @return
- * @throws Exception
- */
- @Override
- public String generateSegmentName(ColumnStatistics statsCollector)
- throws Exception {
- if (_segmentName != null) {
- return _segmentName;
- }
-
- String segmentName;
-
- if (_timeColumnName != null && _timeColumnName.length() > 0) {
- final Object minTimeValue = statsCollector.getMinValue();
- final Object maxTimeValue = statsCollector.getMaxValue();
- if (_segmentNamePostfix == null) {
- segmentName = buildBasic(_tableName, minTimeValue, maxTimeValue, _sequenceId);
- } else {
- segmentName = buildBasic(_tableName, minTimeValue, maxTimeValue, _sequenceId, _segmentNamePostfix);
- }
- } else {
- if (_segmentNamePostfix == null) {
- segmentName = buildBasic(_tableName, _sequenceId);
- } else {
- segmentName = buildBasic(_tableName, _sequenceId, _segmentNamePostfix);
- }
- }
-
- return segmentName;
- }
-
- protected static String buildBasic(String tableName, Object minTimeValue, Object maxTimeValue, int sequenceId,
- String postfix) {
- if (sequenceId == -1) {
- return StringUtil.join("_", tableName, minTimeValue.toString(), maxTimeValue.toString(), postfix);
- } else {
- return StringUtil.join("_", tableName, minTimeValue.toString(), maxTimeValue.toString(), postfix,
- Integer.toString(sequenceId));
- }
- }
-
- protected static String buildBasic(String tableName, Object minTimeValue, Object maxTimeValue, int sequenceId) {
- if (sequenceId == -1) {
- return StringUtil.join("_", tableName, minTimeValue.toString(), maxTimeValue.toString());
- } else {
- return StringUtil
- .join("_", tableName, minTimeValue.toString(), maxTimeValue.toString(), Integer.toString(sequenceId));
- }
- }
-
- protected static String buildBasic(String tableName, int sequenceId) {
- if (sequenceId == -1) {
- return tableName;
- } else {
- return StringUtil.join("_", tableName, Integer.toString(sequenceId));
- }
- }
-
- protected static String buildBasic(String tableName, int sequenceId, String postfix) {
- if (sequenceId == -1) {
- return StringUtil.join("_", tableName, postfix);
- } else {
- return StringUtil.join("_", tableName, postfix, Integer.toString(sequenceId));
- }
- }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/FixedSegmentNameGenerator.java
similarity index 58%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/name/FixedSegmentNameGenerator.java
index 00965e3..900a79e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/FixedSegmentNameGenerator.java
@@ -18,13 +18,26 @@
*/
package org.apache.pinot.core.segment.name;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
+import javax.annotation.Nullable;
/**
- * An interface that allows generates names for segments depending on the naming scheme.
+ * Fixed segment name generator which always returns the fixed segment name.
*/
-public interface SegmentNameGenerator {
- String generateSegmentName(ColumnStatistics timeColStatsCollector)
- throws Exception;
+public class FixedSegmentNameGenerator implements SegmentNameGenerator {
+ private final String _segmentName;
+
+ public FixedSegmentNameGenerator(String segmentName) {
+ _segmentName = segmentName;
+ }
+
+ @Override
+ public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+ return _segmentName;
+ }
+
+ @Override
+ public String toString() {
+ return String.format("FixedSegmentNameGenerator: segmentName=%s", _segmentName);
+ }
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index 55842f5..cc80054 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -18,147 +18,115 @@
*/
package org.apache.pinot.core.segment.name;
-import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.text.ParseException;
import java.text.SimpleDateFormat;
-import java.util.Arrays;
import java.util.Date;
import java.util.TimeZone;
import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.data.TimeGranularitySpec;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
/**
* Segment name generator that normalizes the date to human readable format.
*/
public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator {
- private static final Logger LOGGER = LoggerFactory.getLogger(NormalizedDateSegmentNameGenerator.class);
-
- private final String _tableName;
- private final int _sequenceId;
- private final String _timeColumnType;
- private final String _tablePushFrequency;
- private final String _tablePushType;
private final String _segmentNamePrefix;
- private final String _segmentExcludeSequenceId;
- private final String _schemaTimeFormat;
+ private final boolean _appendPushType;
+ private final boolean _excludeSequenceId;
- public NormalizedDateSegmentNameGenerator(String tableName, int sequenceId, String timeColumnType,
- String tablePushFrequency, String tablePushType, String segmentNamePrefix, String segmentExcludeSequenceId,
- String schemaTimeFormat) {
- _tableName = tableName;
- _sequenceId = sequenceId;
- _timeColumnType = timeColumnType;
- _tablePushFrequency = tablePushFrequency;
- _tablePushType = tablePushType;
- _segmentNamePrefix = segmentNamePrefix;
- _segmentExcludeSequenceId = segmentExcludeSequenceId;
- _schemaTimeFormat = schemaTimeFormat;
- }
+ // For APPEND tables
+ private final SimpleDateFormat _outputSDF;
+ // For EPOCH time format
+ private final TimeUnit _inputTimeUnit;
+ // For SIMPLE_DATE_FORMAT time format
+ private final SimpleDateFormat _inputSDF;
- @Override
- public String generateSegmentName(ColumnStatistics statsCollector)
- throws Exception {
- long minTimeValue = Long.parseLong(statsCollector.getMinValue().toString());
- long maxTimeValue = Long.parseLong(statsCollector.getMaxValue().toString());
- String segmentName = generateSegmentName(minTimeValue, maxTimeValue);
- LOGGER.info("Segment name is: " + segmentName + " for table name: " + _tableName);
- return segmentName;
- }
+ public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix,
+ @Nullable String excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency,
+ @Nullable String timeType, @Nullable String timeFormat) {
+ _segmentNamePrefix = segmentNamePrefix != null ? segmentNamePrefix.trim() : tableName;
+ _appendPushType = "APPEND".equalsIgnoreCase(pushType);
+ _excludeSequenceId = Boolean.parseBoolean(excludeSequenceId);
- /**
- * Generate the segment name given raw min, max time value
- *
- * @param minTimeValue min time value from a segment
- * @param maxTimeValue max time value from a segment
- * @return segment name with normalized time
- */
- public String generateSegmentName(long minTimeValue, long maxTimeValue) {
- // Use table name for prefix by default unless explicitly set
- String segmentPrefix = _tableName;
- if (_segmentNamePrefix != null) {
- LOGGER.info("Using prefix for table " + _tableName + ", prefix " + _segmentNamePrefix);
- segmentPrefix = _segmentNamePrefix;
- } else {
- LOGGER.info("Using default naming scheme for " + _tableName);
- }
+ // Include time info for APPEND push type
+ if (_appendPushType) {
+ // For HOURLY push frequency, include hours into output format
+ if ("HOURLY".equalsIgnoreCase(pushFrequency)) {
+ _outputSDF = new SimpleDateFormat("yyyy-MM-dd-HH");
+ } else {
+ _outputSDF = new SimpleDateFormat("yyyy-MM-dd");
+ }
+ _outputSDF.setTimeZone(TimeZone.getTimeZone("UTC"));
- String sequenceId = Integer.toString(_sequenceId);
- Boolean excludeSequenceId = Boolean.valueOf(_segmentExcludeSequenceId);
- if (excludeSequenceId) {
- sequenceId = null;
+ // Parse input time format: 'EPOCH' or 'SIMPLE_DATE_FORMAT:<pattern>'
+ if (Preconditions.checkNotNull(timeFormat).equals(TimeFormat.EPOCH.toString())) {
+ _inputTimeUnit = TimeUnit.valueOf(timeType);
+ _inputSDF = null;
+ } else {
+ Preconditions.checkArgument(timeFormat.startsWith(TimeFormat.SIMPLE_DATE_FORMAT.toString()),
+ "Invalid time format: %s, must be one of '%s' or '%s:<pattern>'", timeFormat, TimeFormat.EPOCH,
+ TimeFormat.SIMPLE_DATE_FORMAT);
+ _inputTimeUnit = null;
+ _inputSDF = new SimpleDateFormat(timeFormat.substring(timeFormat.indexOf(':') + 1));
+ _inputSDF.setTimeZone(TimeZone.getTimeZone("UTC"));
+ }
+ } else {
+ _outputSDF = null;
+ _inputTimeUnit = null;
+ _inputSDF = null;
}
+ }
- String minTimeValueNormalized = null;
- String maxTimeValueNormalized = null;
- // For append use cases, we add dates; otherwise, we don't
- if (_tablePushType.equalsIgnoreCase("APPEND")) {
- minTimeValueNormalized = getNormalizedDate(minTimeValue);
- maxTimeValueNormalized = getNormalizedDate(maxTimeValue);
- LOGGER.info("Table push type is append. Min time value = " + minTimeValueNormalized + " and max time value = "
- + maxTimeValueNormalized + " table name: " + _tableName);
+ @Override
+ public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+ Integer sequenceIdInSegmentName = !_excludeSequenceId && sequenceId >= 0 ? sequenceId : null;
+ if (!_appendPushType) {
+ return JOINER.join(_segmentNamePrefix, sequenceIdInSegmentName);
} else {
- LOGGER.info("Table push type is refresh for table: " + _tableName);
+ return JOINER.join(_segmentNamePrefix, getNormalizedDate(Preconditions.checkNotNull(minTimeValue)),
+ getNormalizedDate(Preconditions.checkNotNull(maxTimeValue)), sequenceIdInSegmentName);
}
- return Joiner.on("_").skipNulls()
- .join(Arrays.asList(segmentPrefix.trim(), minTimeValueNormalized, maxTimeValueNormalized, sequenceId));
}
/**
- * Convert the raw time value into human readable date format
+ * Converts the time value into human readable date format.
*
- * @param timeValue time column value
- * @return normalized date string
+ * @param timeValue Time value
+ * @return Normalized date string
*/
- private String getNormalizedDate(long timeValue) {
- Date sinceEpoch = null;
- if (_schemaTimeFormat.equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())) {
- TimeUnit timeUnit = TimeUnit.valueOf(_timeColumnType);
-
- switch (timeUnit) {
- case MILLISECONDS:
- sinceEpoch = new Date(timeValue);
- break;
- case SECONDS:
- sinceEpoch = new Date(TimeUnit.SECONDS.toMillis(timeValue));
- break;
- case MINUTES:
- sinceEpoch = new Date(TimeUnit.MINUTES.toMillis(timeValue));
- break;
- case HOURS:
- sinceEpoch = new Date(TimeUnit.HOURS.toMillis(timeValue));
- break;
- case DAYS:
- sinceEpoch = new Date(TimeUnit.DAYS.toMillis(timeValue));
- break;
- }
+ private String getNormalizedDate(Object timeValue) {
+ if (_inputTimeUnit != null) {
+ return _outputSDF.format(new Date(_inputTimeUnit.toMillis(Long.parseLong(timeValue.toString()))));
} else {
try {
- SimpleDateFormat dateFormatPassedIn = new SimpleDateFormat(_schemaTimeFormat);
- dateFormatPassedIn.setTimeZone(TimeZone.getTimeZone("UTC"));
- sinceEpoch = dateFormatPassedIn.parse(Long.toString(timeValue));
- } catch (Exception e) {
- throw new RuntimeException("Could not parse simple date format: '" + _schemaTimeFormat);
+ return _outputSDF.format(_inputSDF.parse(timeValue.toString()));
+ } catch (ParseException e) {
+ throw new RuntimeException(String
+ .format("Caught exception while parsing simple date format: %s with value: %s", _inputSDF.toPattern(),
+ timeValue), e);
}
}
+ }
- // Pick the current time when having a parsing error
- if (sinceEpoch == null) {
- LOGGER.warn("Could not translate timeType '" + _timeColumnType);
- sinceEpoch = new Date();
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder =
+ new StringBuilder("NormalizedDateSegmentNameGenerator: segmentNamePrefix=").append(_segmentNamePrefix)
+ .append(", appendPushType=").append(_appendPushType);
+ if (_excludeSequenceId) {
+ stringBuilder.append(", excludeSequenceId=true");
}
-
- // Get date format
- SimpleDateFormat dateFormat;
- if (_tablePushFrequency.equalsIgnoreCase("HOURLY")) {
- dateFormat = new SimpleDateFormat("yyyy-MM-dd-HH");
- } else {
- dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+ if (_outputSDF != null) {
+ stringBuilder.append(", outputSDF=").append(_outputSDF.toPattern());
}
- dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-
- return dateFormat.format(sinceEpoch);
+ if (_inputTimeUnit != null) {
+ stringBuilder.append(", inputTimeUnit=").append(_inputTimeUnit);
+ }
+ if (_inputSDF != null) {
+ stringBuilder.append(", inputSDF=").append(_inputSDF.toPattern());
+ }
+ return stringBuilder.toString();
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
index 00965e3..eed203c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
@@ -18,13 +18,23 @@
*/
package org.apache.pinot.core.segment.name;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
+import com.google.common.base.Joiner;
+import javax.annotation.Nullable;
/**
- * An interface that allows generates names for segments depending on the naming scheme.
+ * Interface for segment name generator based on the segment sequence id and time range.
*/
public interface SegmentNameGenerator {
- String generateSegmentName(ColumnStatistics timeColStatsCollector)
- throws Exception;
+ Joiner JOINER = Joiner.on('_').skipNulls();
+
+ /**
+ * Generates the segment name.
+ *
+ * @param sequenceId Segment sequence id (negative value means INVALID)
+ * @param minTimeValue Minimum time value
+ * @param maxTimeValue Maximum time value
+ * @return Segment name generated
+ */
+ String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue);
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java
new file mode 100644
index 0000000..9660f8e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.name;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Simple segment name generator which does not perform time conversion.
+ * <p>
+ * The segment name is simply joining the following fields with '_' but ignoring all the {@code null}s.
+ * <ul>
+ * <li>Table name</li>
+ * <li>Minimum time value</li>
+ * <li>Maximum time value</li>
+ * <li>Segment name postfix</li>
+ * <li>Sequence id</li>
+ * </ul>
+ */
+public class SimpleSegmentNameGenerator implements SegmentNameGenerator {
+ private final String _tableName;
+ private final String _segmentNamePostfix;
+
+ public SimpleSegmentNameGenerator(String tableName, String segmentNamePostfix) {
+ _tableName = tableName;
+ _segmentNamePostfix = segmentNamePostfix;
+ }
+
+ @Override
+ public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+ return JOINER
+ .join(_tableName, minTimeValue, maxTimeValue, _segmentNamePostfix, sequenceId >= 0 ? sequenceId : null);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder stringBuilder = new StringBuilder("SimpleSegmentNameGenerator: tableName=").append(_tableName);
+ if (_segmentNamePostfix != null) {
+ stringBuilder.append(", segmentNamePostfix=").append(_segmentNamePostfix);
+ }
+ return stringBuilder.toString();
+ }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGeneratorTest.java
deleted file mode 100644
index f94088e..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGeneratorTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.name;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.core.segment.creator.impl.SegmentCreationDriverFactory;
-import org.apache.pinot.core.segment.index.ColumnMetadataTest;
-import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-public class DefaultSegmentNameGeneratorTest {
-
- private static final String AVRO_DATA = "data/test_data-mv.avro";
- private static final File INDEX_DIR = new File(ColumnMetadataTest.class.toString());
-
- @BeforeMethod
- public void setUp()
- throws Exception {
- FileUtils.deleteQuietly(INDEX_DIR);
- }
-
- @AfterMethod
- public void tearDown() {
- FileUtils.deleteQuietly(INDEX_DIR);
- }
-
- public SegmentGeneratorConfig CreateSegmentConfigWithoutCreator()
- throws Exception {
- final String filePath =
- TestUtils.getFileFromResourceUrl(DefaultSegmentNameGeneratorTest.class.getClassLoader().getResource(AVRO_DATA));
- // Intentionally changed this to TimeUnit.Hours to make it non-default for testing.
- SegmentGeneratorConfig config = SegmentTestUtils
- .getSegmentGenSpecWithSchemAndProjectedColumns(new File(filePath), INDEX_DIR, "daysSinceEpoch", TimeUnit.HOURS,
- "testTable");
- config.setSegmentNamePostfix("1");
- config.setTimeColumnName("daysSinceEpoch");
- return config;
- }
-
- @Test
- public void testPostfix()
- throws Exception {
- ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
- // Build the Segment metadata.
- SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
- SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("daysSinceEpoch", "mytable", "1", -1);
- config.setSegmentNameGenerator(segmentNameGenerator);
- SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
- driver.init(config);
- driver.build();
- Assert.assertEquals(driver.getSegmentName(), "mytable_1756015683_1756015683_1");
- }
-
- @Test
- public void testAlreadyNamedSegment()
- throws Exception {
- ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
- // Build the Segment metadata.
- SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
- SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("mytable_1");
- config.setSegmentNameGenerator(segmentNameGenerator);
- SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
- driver.init(config);
- driver.build();
- Assert.assertEquals(driver.getSegmentName(), "mytable_1");
- }
-
- @Test
- public void testNullTimeColumn()
- throws Exception {
- ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
- // Build the Segment metadata.
- SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
- config.setTableName("mytable");
- config.setSegmentNamePostfix("postfix");
- config.setTimeColumnName(null);
- SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
- driver.init(config);
- driver.build();
- Assert.assertEquals(driver.getSegmentName(), "mytable_postfix");
- }
-
- @Test
- public void testNullTimeColumnThroughDefaultSegment()
- throws Exception {
- ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
- // Build the Segment metadata.
- SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
- SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator(null, "mytable", "1", 2);
- config.setSegmentNameGenerator(segmentNameGenerator);
- SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
- driver.init(config);
- driver.build();
- Assert.assertEquals(driver.getSegmentName(), "mytable_1_2");
- }
-
- @Test
- public void testNullPostfix()
- throws Exception {
- ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
- // Build the Segment metadata.
- SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
- SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("daysSinceEpoch", "mytable", null, -1);
- config.setSegmentNameGenerator(segmentNameGenerator);
- SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
- driver.init(config);
- driver.build();
- Assert.assertEquals(driver.getSegmentName(), "mytable_1756015683_1756015683");
- }
-
- @Test
- public void testNullPostfixWithNonNegSequenceId()
- throws Exception {
- ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
- // Build the Segment metadata.
- SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
- SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("daysSinceEpoch", "mytable", null, 2);
- config.setSegmentNameGenerator(segmentNameGenerator);
- SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
- driver.init(config);
- driver.build();
- Assert.assertEquals(driver.getSegmentName(), "mytable_1756015683_1756015683_2");
- }
-
- @Test
- public void testOnlyTableName()
- throws Exception {
- ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
- // Build the Segment metadata.
- SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
- SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator(null, "mytable", null, -1);
- config.setSegmentNameGenerator(segmentNameGenerator);
- SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
- driver.init(config);
- driver.build();
- Assert.assertEquals(driver.getSegmentName(), "mytable");
- }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
index bd16532..52bfcab 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
@@ -18,135 +18,141 @@
*/
package org.apache.pinot.core.segment.name;
-import org.apache.pinot.common.data.DateTimeFieldSpec;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
-import org.mockito.Mockito;
-import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
public class NormalizedDateSegmentNameGeneratorTest {
private static final String TABLE_NAME = "myTable";
- private static final int SEQUENCE_ID = 1;
- private static final String TIME_COLUMN_TYPE = "DAYS";
- private static final String TABLE_PUSH_FREQUENCY = "daily";
- private static final String PREFIX = "myTable_daily";
+ private static final String SEGMENT_NAME_PREFIX = "myTable_daily";
private static final String APPEND_PUSH_TYPE = "APPEND";
private static final String REFRESH_PUSH_TYPE = "REFRESH";
+ private static final String DAYS_TIME_TYPE = "DAYS";
+ private static final String HOURS_TIME_TYPE = "HOURS";
+ private static final String EPOCH_TIME_FORMAT = "EPOCH";
+ private static final String LONG_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+ private static final String STRING_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyy-MM-dd";
+ private static final String DAILY_PUSH_FREQUENCY = "daily";
+ private static final String HOURLY_PUSH_FREQUENCY = "hourly";
+ private static final int INVALID_SEQUENCE_ID = -1;
+ private static final int VALID_SEQUENCE_ID = 1;
@Test
- public void testAppend()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
- APPEND_PUSH_TYPE, null, null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
- "myTable_1970-01-02_1970-01-04_1");
+ public void testRefresh() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, REFRESH_PUSH_TYPE, null, null, null);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_1");
+ }
+
+ @Test
+ public void testWithSegmentNamePrefix() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, null, REFRESH_PUSH_TYPE, null, null,
+ null);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_1");
}
@Test
- public void testAppendWithPrefix()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
- APPEND_PUSH_TYPE, PREFIX, null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
- "myTable_daily_1970-01-02_1970-01-04_1");
+ public void testWithUntrimmedSegmentNamePrefix() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX + " ", null, REFRESH_PUSH_TYPE, null,
+ null, null);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_1");
}
@Test
- public void testRefresh()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, null, TABLE_PUSH_FREQUENCY, REFRESH_PUSH_TYPE,
- null, null, null);
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass), "myTable_1");
+ public void testExcludeSequenceId() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, "true", REFRESH_PUSH_TYPE, null, null, null);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false, excludeSequenceId=true");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable");
}
@Test
- public void testNoSequenceIdWithParameter()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, null, TABLE_PUSH_FREQUENCY, REFRESH_PUSH_TYPE,
- null, "true", null);
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass), "myTable");
+ public void testWithPrefixExcludeSequenceId() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, "true", REFRESH_PUSH_TYPE, null, null,
+ null);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false, excludeSequenceId=true");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily");
}
@Test
- public void testRefreshWithPrefixNoSequenceId()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, null, TABLE_PUSH_FREQUENCY, REFRESH_PUSH_TYPE,
- "prefix", "true", null);
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass), "prefix");
+ public void testAppend() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ DAYS_TIME_TYPE, EPOCH_TIME_FORMAT);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=DAYS");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
+ "myTable_1970-01-02_1970-01-04");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 1L, 3L),
+ "myTable_1970-01-02_1970-01-04_1");
}
@Test
- public void testAppendWithPrefixNoSequenceId()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
- APPEND_PUSH_TYPE, "prefix", "true", DateTimeFieldSpec.TimeFormat.EPOCH.toString());
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
- "prefix_1970-01-02_1970-01-04");
+ public void testHoursTimeType() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ HOURS_TIME_TYPE, EPOCH_TIME_FORMAT);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=HOURS");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 24L, 72L),
+ "myTable_1970-01-02_1970-01-04");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 24L, 72L),
+ "myTable_1970-01-02_1970-01-04_1");
}
@Test
- public void testMirrorShare()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
- APPEND_PUSH_TYPE, "mirrorShareEvents_daily", null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
- "mirrorShareEvents_daily_1970-01-02_1970-01-04_1");
+ public void testLongSimpleDateFormat() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ DAYS_TIME_TYPE, LONG_SIMPLE_DATE_FORMAT);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyyMMdd");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 19700102L, 19700104L),
+ "myTable_1970-01-02_1970-01-04");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 19700102L, 19700104L),
+ "myTable_1970-01-02_1970-01-04_1");
}
@Test
- public void testUntrimmedPrefix()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
- when(columnStatisticsClass.getMinValue()).thenReturn(1L);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
- APPEND_PUSH_TYPE, "mirrorShareEvents_daily ", null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
- "mirrorShareEvents_daily_1970-01-02_1970-01-04_1");
+ public void testStringSimpleDateFormat() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+ DAYS_TIME_TYPE, STRING_SIMPLE_DATE_FORMAT);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyy-MM-dd");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, "1970-01-02", "1970-01-04"),
+ "myTable_1970-01-02_1970-01-04");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, "1970-01-02", "1970-01-04"),
+ "myTable_1970-01-02_1970-01-04_1");
}
@Test
- public void testSimpleDateFormat()
- throws Exception {
- ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
- when(columnStatisticsClass.getMaxValue()).thenReturn(19700104);
- when(columnStatisticsClass.getMinValue()).thenReturn(19700102);
- NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
- APPEND_PUSH_TYPE, "mirrorShareEvents_daily ", null, "yyyyMMdd");
- Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
- "mirrorShareEvents_daily_1970-01-02_1970-01-04_1");
+ public void testHourlyPushFrequency() {
+ SegmentNameGenerator segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, HOURLY_PUSH_FREQUENCY,
+ DAYS_TIME_TYPE, EPOCH_TIME_FORMAT);
+ assertEquals(segmentNameGenerator.toString(),
+ "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd-HH, inputTimeUnit=DAYS");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
+ "myTable_1970-01-02-00_1970-01-04-00");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 1L, 3L),
+ "myTable_1970-01-02-00_1970-01-04-00_1");
}
}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGeneratorTest.java
new file mode 100644
index 0000000..61b0f6a
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGeneratorTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.name;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SimpleSegmentNameGeneratorTest {
+ private static final String TABLE_NAME = "testTable";
+ private static final String SEGMENT_NAME_POSTFIX = "postfix";
+ private static final int INVALID_SEQUENCE_ID = -1;
+ private static final int VALID_SEQUENCE_ID = 0;
+ private static final long MIN_TIME_VALUE = 1234L;
+ private static final long MAX_TIME_VALUE = 5678L;
+
+ @Test
+ public void testWithoutSegmentNamePostfix() {
+ SegmentNameGenerator segmentNameGenerator = new SimpleSegmentNameGenerator(TABLE_NAME, null);
+ assertEquals(segmentNameGenerator.toString(), "SimpleSegmentNameGenerator: tableName=testTable");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "testTable");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+ "testTable_1234_5678");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "testTable_0");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+ "testTable_1234_5678_0");
+ }
+
+ @Test
+ public void testWithSegmentNamePostfix() {
+ SegmentNameGenerator segmentNameGenerator = new SimpleSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_POSTFIX);
+ assertEquals(segmentNameGenerator.toString(),
+ "SimpleSegmentNameGenerator: tableName=testTable, segmentNamePostfix=postfix");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "testTable_postfix");
+ assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+ "testTable_1234_5678_postfix");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "testTable_postfix_0");
+ assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+ "testTable_1234_5678_postfix_0");
+ }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index c87378a..1a50c5c 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -29,14 +29,24 @@ public class JobConfigConstants {
public static final String SEGMENT_TAR_DIR = "segmentTar";
public static final String TAR_GZ_FILE_EXT = ".tar.gz";
- public static final String SCHEMA = "data.schema";
public static final String SEGMENT_TABLE_NAME = "segment.table.name";
+ public static final String TABLE_CONFIG = "table.config";
+ public static final String SCHEMA = "data.schema";
+
+ public static final String SEGMENT_NAME_GENERATOR_TYPE = "segment.name.generator.type";
+ public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+ public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate";
+ public static final String DEFAULT_SEGMENT_NAME_GENERATOR = SIMPLE_SEGMENT_NAME_GENERATOR;
+
+ // For SimpleSegmentNameGenerator
public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
+ // For NormalizedDateSegmentNameGenerator
+ public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
+ public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
+
public static final String PUSH_TO_HOSTS = "push.to.hosts";
public static final String PUSH_TO_PORT = "push.to.port";
- public static final String TABLE_CONFIG = "table.config";
-
public static final String DEFAULT_PERMISSIONS_MASK = "fs.permissions.umask-mode";
}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index a07ecf5..dcfab1a 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
import org.apache.pinot.common.config.TableConfig;
import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
import org.apache.pinot.common.utils.DataSize;
import org.apache.pinot.common.utils.JsonUtils;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -44,6 +46,9 @@ import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
import org.apache.pinot.hadoop.job.JobConfigConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,10 +62,10 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
protected Configuration _jobConf;
protected String _rawTableName;
protected Schema _schema;
+ protected SegmentNameGenerator _segmentNameGenerator;
// Optional
protected TableConfig _tableConfig;
- protected String _segmentNamePostfix;
protected Path _readerConfigFile;
// HDFS segment tar directory
@@ -90,12 +95,36 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
if (tableConfigString != null) {
_tableConfig = TableConfig.fromJsonString(tableConfigString);
}
- _segmentNamePostfix = _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX);
String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
if (readerConfigFile != null) {
_readerConfigFile = new Path(readerConfigFile);
}
+ // Set up segment name generator
+ String segmentNameGeneratorType =
+ _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+ switch (segmentNameGeneratorType) {
+ case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+ _segmentNameGenerator =
+ new SimpleSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+ break;
+ case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+ Preconditions.checkState(_tableConfig != null,
+ "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+ SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+ String timeFormat = null;
+ TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+ if (timeFieldSpec != null) {
+ timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+ }
+ _segmentNameGenerator =
+ new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+ _jobConf.get(JobConfigConstants.EXCLUDE_SEQUENCE_ID), validationConfig.getSegmentPushType(),
+ validationConfig.getSegmentPushFrequency(), validationConfig.getTimeType(), timeFormat);
+ default:
+ throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+ }
+
// Working directories
_hdfsSegmentTarDir = new Path(FileOutputFormat.getWorkOutputPath(context), JobConfigConstants.SEGMENT_TAR_DIR);
_localStagingDir = new File(LOCAL_TEMP_DIR);
@@ -119,8 +148,8 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
_logger.info("*********************************************************************");
_logger.info("Raw Table Name: {}", _rawTableName);
_logger.info("Schema: {}", _schema);
+ _logger.info("Segment Name Generator: {}", _segmentNameGenerator);
_logger.info("Table Config: {}", _tableConfig);
- _logger.info("Segment Name Postfix: {}", _segmentNamePostfix);
_logger.info("Reader Config File: {}", _readerConfigFile);
_logger.info("*********************************************************************");
_logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
@@ -149,10 +178,8 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
segmentGeneratorConfig.setTableName(_rawTableName);
segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
+ segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
segmentGeneratorConfig.setSequenceId(sequenceId);
- if (_segmentNamePostfix != null) {
- segmentGeneratorConfig.setSegmentNamePostfix(_segmentNamePostfix);
- }
FileFormat fileFormat = getFileFormat(inputFileName);
segmentGeneratorConfig.setFormat(fileFormat);
segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java
index 0f81130..34094b9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java
@@ -25,13 +25,10 @@ import org.apache.pinot.common.data.StarTreeIndexSpec;
import org.apache.pinot.common.segment.SegmentMetadata;
import org.apache.pinot.common.utils.TarGzCompressionUtils;
import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
-import org.apache.pinot.core.segment.name.DefaultSegmentNameGenerator;
-import org.apache.pinot.core.segment.name.SegmentNameGenerator;
import org.kohsuke.args4j.CmdLineParser;
import org.kohsuke.args4j.Option;
@@ -111,14 +108,15 @@ public class ColumnarToStarTreeConverter {
*/
private void convertSegment(File columnarSegment)
throws Exception {
- PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(columnarSegment);
- SegmentGeneratorConfig config = new SegmentGeneratorConfig(pinotSegmentRecordReader.getSchema());
-
+ SegmentMetadata segmentMetadata = new SegmentMetadataImpl(columnarSegment);
+ SegmentGeneratorConfig config = new SegmentGeneratorConfig(segmentMetadata.getSchema());
config.setDataDir(_inputDirName);
config.setInputFilePath(columnarSegment.getAbsolutePath());
config.setFormat(FileFormat.PINOT);
config.setOutDir(_outputDirName);
config.setOverwrite(_overwrite);
+ config.setTableName(segmentMetadata.getTableName());
+ config.setSegmentName(segmentMetadata.getName());
StarTreeIndexSpec starTreeIndexSpec = null;
if (_starTreeConfigFileName != null) {
@@ -126,12 +124,6 @@ public class ColumnarToStarTreeConverter {
}
config.enableStarTreeIndex(starTreeIndexSpec);
- // Read the segment and table name from the segment's metadata.
- SegmentMetadata metadata = new SegmentMetadataImpl(columnarSegment);
- SegmentNameGenerator nameGenerator = new DefaultSegmentNameGenerator(metadata.getName());
- config.setSegmentNameGenerator(nameGenerator);
- config.setTableName(metadata.getTableName());
-
SegmentIndexCreationDriver indexCreator = new SegmentIndexCreationDriverImpl();
indexCreator.init(config);
indexCreator.build();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
index 8c9f3f5..8a86a7a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -179,7 +179,7 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
// Compute segment name if it is not specified
if (_outputSegmentName == null) {
- _outputSegmentName = getDefaultSegmentName(tableConfig, schema, inputIndexDirs, minStartTime, maxEndTime);
+ _outputSegmentName = getDefaultSegmentName(tableConfig, schema, minStartTime, maxEndTime);
}
LOGGER.info("Output segment name: {}", _outputSegmentName);
@@ -234,23 +234,20 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
return "Create the merged segment using concatenation";
}
- private String getDefaultSegmentName(TableConfig tableConfig, Schema schema, List<File> inputIndexDirs,
- long minStartTime, long maxEndTime)
- throws Exception {
+ private String getDefaultSegmentName(TableConfig tableConfig, Schema schema, long minStartTime, long maxEndTime) {
String tableName = tableConfig.getTableName();
// Fetch time related configurations from schema and table config.
String pushFrequency = tableConfig.getValidationConfig().getSegmentPushFrequency();
- String timeColumnType = tableConfig.getValidationConfig().getTimeType();
+ String timeType = tableConfig.getValidationConfig().getTimeType();
String pushType = tableConfig.getValidationConfig().getSegmentPushType();
String timeFormat = schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat();
// Generate the final segment name using segment name generator
NormalizedDateSegmentNameGenerator segmentNameGenerator =
- new NormalizedDateSegmentNameGenerator(tableName, DEFAULT_SEQUENCE_ID, timeColumnType, pushFrequency, pushType,
- null, null, timeFormat);
+ new NormalizedDateSegmentNameGenerator(tableName, null, null, pushType, pushFrequency, timeType, timeFormat);
- return segmentNameGenerator.generateSegmentName(minStartTime, maxEndTime);
+ return segmentNameGenerator.generateSegmentName(DEFAULT_SEQUENCE_ID, minStartTime, maxEndTime);
}
private boolean isPinotSegment(File path) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org