You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2019/02/14 02:26:57 UTC

[incubator-pinot] branch master updated: Refactor SegmentNameGenerators and integrate them into Hadoop (#3821)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d3fad80  Refactor SegmentNameGenerators and integrate them into Hadoop (#3821)
d3fad80 is described below

commit d3fad80d02be7a4a099db2a581d57bb73beaa0ab
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Wed Feb 13 18:26:52 2019 -0800

    Refactor SegmentNameGenerators and integrate them into Hadoop (#3821)
    
    - generateSegmentName() now take 3 arguments: sequenceId, minTimeValue, maxTimeValue
    - 3 segment name generator implementations:
      - FixedSegmentNameGenerator: generate fixed segment name
      - SimpleSegmentNameGenerator: generate segment name without time value conversion
      - NormalizedDateSegmentNameGenerator: generate segment name with normalized date in human readable format
    - Integrate SimpleSegmentNameGenerator and NormalizedDateSegmentNameGenerator into Hadoop job
---
 .../generator/SegmentGeneratorConfig.java          |   8 +-
 .../impl/SegmentIndexCreationDriverImpl.java       |  10 +-
 .../segment/name/DefaultSegmentNameGenerator.java  | 146 ---------------
 ...nerator.java => FixedSegmentNameGenerator.java} |  23 ++-
 .../name/NormalizedDateSegmentNameGenerator.java   | 190 ++++++++------------
 .../core/segment/name/SegmentNameGenerator.java    |  18 +-
 .../segment/name/SimpleSegmentNameGenerator.java   |  59 ++++++
 .../name/DefaultSegmentNameGeneratorTest.java      | 163 -----------------
 .../NormalizedDateSegmentNameGeneratorTest.java    | 198 +++++++++++----------
 .../name/SimpleSegmentNameGeneratorTest.java       |  58 ++++++
 .../pinot/hadoop/job/JobConfigConstants.java       |  16 +-
 .../hadoop/job/mapper/SegmentCreationMapper.java   |  39 +++-
 .../converter/ColumnarToStarTreeConverter.java     |  16 +-
 .../segment/converter/SegmentMergeCommand.java     |  13 +-
 14 files changed, 397 insertions(+), 560 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
index 61cfbac..40024dc 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/indexsegment/generator/SegmentGeneratorConfig.java
@@ -51,8 +51,9 @@ import org.apache.pinot.core.data.readers.CSVRecordReaderConfig;
 import org.apache.pinot.core.data.readers.FileFormat;
 import org.apache.pinot.core.data.readers.RecordReaderConfig;
 import org.apache.pinot.core.io.compression.ChunkCompressorFactory;
-import org.apache.pinot.core.segment.name.DefaultSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.FixedSegmentNameGenerator;
 import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.core.startree.v2.builder.StarTreeV2BuilderConfig;
 import org.apache.pinot.core.util.AvroUtils;
 import org.apache.pinot.startree.hll.HllConfig;
@@ -544,10 +545,9 @@ public class SegmentGeneratorConfig {
       return _segmentNameGenerator;
     }
     if (_segmentName != null) {
-      return new DefaultSegmentNameGenerator(_segmentName);
+      return new FixedSegmentNameGenerator(_segmentName);
     }
-    return new DefaultSegmentNameGenerator(getTimeColumnName(), getTableName(), getSegmentNamePostfix(),
-        getSequenceId());
+    return new SimpleSegmentNameGenerator(_tableName, _segmentNamePostfix);
   }
 
   public void setSegmentNameGenerator(SegmentNameGenerator segmentNameGenerator) {
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
index c13a686..645165d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/creator/impl/SegmentIndexCreationDriverImpl.java
@@ -358,8 +358,14 @@ public class SegmentIndexCreationDriverImpl implements SegmentIndexCreationDrive
 
   private void handlePostCreation()
       throws Exception {
-    final String timeColumn = config.getTimeColumnName();
-    segmentName = config.getSegmentNameGenerator().generateSegmentName(segmentStats.getColumnProfileFor(timeColumn));
+    ColumnStatistics timeColumnStatistics = segmentStats.getColumnProfileFor(config.getTimeColumnName());
+    int sequenceId = config.getSequenceId();
+    if (timeColumnStatistics != null) {
+      segmentName = config.getSegmentNameGenerator()
+          .generateSegmentName(sequenceId, timeColumnStatistics.getMinValue(), timeColumnStatistics.getMaxValue());
+    } else {
+      segmentName = config.getSegmentNameGenerator().generateSegmentName(sequenceId, null, null);
+    }
 
     try {
       // Write the index files to disk
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGenerator.java
deleted file mode 100644
index ab627a1..0000000
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGenerator.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.name;
-
-import org.apache.pinot.common.utils.StringUtil;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
-
-
-public class DefaultSegmentNameGenerator implements SegmentNameGenerator {
-  private final String _segmentName;
-  private final String _timeColumnName;
-  private final String _tableName;
-  private final String _segmentNamePostfix;
-  private final int _sequenceId;
-
-  /**
-   * To be used when segment name is pre-decided externally
-   * @param segmentName
-   */
-  public DefaultSegmentNameGenerator(final String segmentName) {
-    _segmentName = segmentName;
-    _tableName = null;
-    _timeColumnName = null;
-    _segmentNamePostfix = null;
-    _sequenceId = -1;
-  }
-
-  /**
-   * To be used to derive segmentName
-   * @param timeColumnName
-   * @param tableName
-   * @param segmentNamePostfix
-   */
-  public DefaultSegmentNameGenerator(String timeColumnName, String tableName, String segmentNamePostfix,
-      int sequenceId) {
-    _timeColumnName = timeColumnName;
-    _tableName = tableName;
-    _segmentNamePostfix = segmentNamePostfix;
-    _segmentName = null;
-    _sequenceId = sequenceId;
-  }
-
-  /**
-   *
-   * The sequenceId is used for numbering segments while the postfix is used to append a string to the segment name.
-   *
-   * Some examples:
-   *
-   * If the time column name exists, _segmentNamePostfix = "postfix", and _sequenceId = 1, the segment name would be
-   * tableName_minDate_maxDate_postfix_1
-   *
-   * If there is no time column, _segmentNamePostfix = "postfix" and _sequenceId = 1, the segment name would be
-   * tableName_postfix_1
-   *
-   * If there is no time column, no postfix, and no sequence id, the segment name would be
-   * tableName
-   *
-   * If there is no time column, a postfix, and no sequence id, the segment name would be
-   * tableName_postfix
-   *
-   * If there is no time column, no postfix, and a sequence id, the segment name would be
-   * tableName_sequenceId
-   *
-   * @param statsCollector
-   * @return
-   * @throws Exception
-   */
-  @Override
-  public String generateSegmentName(ColumnStatistics statsCollector)
-      throws Exception {
-    if (_segmentName != null) {
-      return _segmentName;
-    }
-
-    String segmentName;
-
-    if (_timeColumnName != null && _timeColumnName.length() > 0) {
-      final Object minTimeValue = statsCollector.getMinValue();
-      final Object maxTimeValue = statsCollector.getMaxValue();
-      if (_segmentNamePostfix == null) {
-        segmentName = buildBasic(_tableName, minTimeValue, maxTimeValue, _sequenceId);
-      } else {
-        segmentName = buildBasic(_tableName, minTimeValue, maxTimeValue, _sequenceId, _segmentNamePostfix);
-      }
-    } else {
-      if (_segmentNamePostfix == null) {
-        segmentName = buildBasic(_tableName, _sequenceId);
-      } else {
-        segmentName = buildBasic(_tableName, _sequenceId, _segmentNamePostfix);
-      }
-    }
-
-    return segmentName;
-  }
-
-  protected static String buildBasic(String tableName, Object minTimeValue, Object maxTimeValue, int sequenceId,
-      String postfix) {
-    if (sequenceId == -1) {
-      return StringUtil.join("_", tableName, minTimeValue.toString(), maxTimeValue.toString(), postfix);
-    } else {
-      return StringUtil.join("_", tableName, minTimeValue.toString(), maxTimeValue.toString(), postfix,
-          Integer.toString(sequenceId));
-    }
-  }
-
-  protected static String buildBasic(String tableName, Object minTimeValue, Object maxTimeValue, int sequenceId) {
-    if (sequenceId == -1) {
-      return StringUtil.join("_", tableName, minTimeValue.toString(), maxTimeValue.toString());
-    } else {
-      return StringUtil
-          .join("_", tableName, minTimeValue.toString(), maxTimeValue.toString(), Integer.toString(sequenceId));
-    }
-  }
-
-  protected static String buildBasic(String tableName, int sequenceId) {
-    if (sequenceId == -1) {
-      return tableName;
-    } else {
-      return StringUtil.join("_", tableName, Integer.toString(sequenceId));
-    }
-  }
-
-  protected static String buildBasic(String tableName, int sequenceId, String postfix) {
-    if (sequenceId == -1) {
-      return StringUtil.join("_", tableName, postfix);
-    } else {
-      return StringUtil.join("_", tableName, postfix, Integer.toString(sequenceId));
-    }
-  }
-}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/FixedSegmentNameGenerator.java
similarity index 58%
copy from pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
copy to pinot-core/src/main/java/org/apache/pinot/core/segment/name/FixedSegmentNameGenerator.java
index 00965e3..900a79e 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/FixedSegmentNameGenerator.java
@@ -18,13 +18,26 @@
  */
 package org.apache.pinot.core.segment.name;
 
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
+import javax.annotation.Nullable;
 
 
 /**
- * An interface that allows generates names for segments depending on the naming scheme.
+ * Fixed segment name generator which always returns the fixed segment name.
  */
-public interface SegmentNameGenerator {
-  String generateSegmentName(ColumnStatistics timeColStatsCollector)
-      throws Exception;
+public class FixedSegmentNameGenerator implements SegmentNameGenerator {
+  private final String _segmentName;
+
+  public FixedSegmentNameGenerator(String segmentName) {
+    _segmentName = segmentName;
+  }
+
+  @Override
+  public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+    return _segmentName;
+  }
+
+  @Override
+  public String toString() {
+    return String.format("FixedSegmentNameGenerator: segmentName=%s", _segmentName);
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
index 55842f5..cc80054 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGenerator.java
@@ -18,147 +18,115 @@
  */
 package org.apache.pinot.core.segment.name;
 
-import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.Arrays;
 import java.util.Date;
 import java.util.TimeZone;
 import java.util.concurrent.TimeUnit;
-import org.apache.pinot.common.data.TimeGranularitySpec;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.data.TimeGranularitySpec.TimeFormat;
 
 
 /**
  * Segment name generator that normalizes the date to human readable format.
  */
 public class NormalizedDateSegmentNameGenerator implements SegmentNameGenerator {
-  private static final Logger LOGGER = LoggerFactory.getLogger(NormalizedDateSegmentNameGenerator.class);
-
-  private final String _tableName;
-  private final int _sequenceId;
-  private final String _timeColumnType;
-  private final String _tablePushFrequency;
-  private final String _tablePushType;
   private final String _segmentNamePrefix;
-  private final String _segmentExcludeSequenceId;
-  private final String _schemaTimeFormat;
+  private final boolean _appendPushType;
+  private final boolean _excludeSequenceId;
 
-  public NormalizedDateSegmentNameGenerator(String tableName, int sequenceId, String timeColumnType,
-      String tablePushFrequency, String tablePushType, String segmentNamePrefix, String segmentExcludeSequenceId,
-      String schemaTimeFormat) {
-    _tableName = tableName;
-    _sequenceId = sequenceId;
-    _timeColumnType = timeColumnType;
-    _tablePushFrequency = tablePushFrequency;
-    _tablePushType = tablePushType;
-    _segmentNamePrefix = segmentNamePrefix;
-    _segmentExcludeSequenceId = segmentExcludeSequenceId;
-    _schemaTimeFormat = schemaTimeFormat;
-  }
+  // For APPEND tables
+  private final SimpleDateFormat _outputSDF;
+  // For EPOCH time format
+  private final TimeUnit _inputTimeUnit;
+  // For SIMPLE_DATE_FORMAT time format
+  private final SimpleDateFormat _inputSDF;
 
-  @Override
-  public String generateSegmentName(ColumnStatistics statsCollector)
-      throws Exception {
-    long minTimeValue = Long.parseLong(statsCollector.getMinValue().toString());
-    long maxTimeValue = Long.parseLong(statsCollector.getMaxValue().toString());
-    String segmentName = generateSegmentName(minTimeValue, maxTimeValue);
-    LOGGER.info("Segment name is: " + segmentName + " for table name: " + _tableName);
-    return segmentName;
-  }
+  public NormalizedDateSegmentNameGenerator(String tableName, @Nullable String segmentNamePrefix,
+      @Nullable String excludeSequenceId, @Nullable String pushType, @Nullable String pushFrequency,
+      @Nullable String timeType, @Nullable String timeFormat) {
+    _segmentNamePrefix = segmentNamePrefix != null ? segmentNamePrefix.trim() : tableName;
+    _appendPushType = "APPEND".equalsIgnoreCase(pushType);
+    _excludeSequenceId = Boolean.parseBoolean(excludeSequenceId);
 
-  /**
-   * Generate the segment name given raw min, max time value
-   *
-   * @param minTimeValue min time value from a segment
-   * @param maxTimeValue max time value from a segment
-   * @return segment name with normalized time
-   */
-  public String generateSegmentName(long minTimeValue, long maxTimeValue) {
-    // Use table name for prefix by default unless explicitly set
-    String segmentPrefix = _tableName;
-    if (_segmentNamePrefix != null) {
-      LOGGER.info("Using prefix for table " + _tableName + ", prefix " + _segmentNamePrefix);
-      segmentPrefix = _segmentNamePrefix;
-    } else {
-      LOGGER.info("Using default naming scheme for " + _tableName);
-    }
+    // Include time info for APPEND push type
+    if (_appendPushType) {
+      // For HOURLY push frequency, include hours into output format
+      if ("HOURLY".equalsIgnoreCase(pushFrequency)) {
+        _outputSDF = new SimpleDateFormat("yyyy-MM-dd-HH");
+      } else {
+        _outputSDF = new SimpleDateFormat("yyyy-MM-dd");
+      }
+      _outputSDF.setTimeZone(TimeZone.getTimeZone("UTC"));
 
-    String sequenceId = Integer.toString(_sequenceId);
-    Boolean excludeSequenceId = Boolean.valueOf(_segmentExcludeSequenceId);
-    if (excludeSequenceId) {
-      sequenceId = null;
+      // Parse input time format: 'EPOCH' or 'SIMPLE_DATE_FORMAT:<pattern>'
+      if (Preconditions.checkNotNull(timeFormat).equals(TimeFormat.EPOCH.toString())) {
+        _inputTimeUnit = TimeUnit.valueOf(timeType);
+        _inputSDF = null;
+      } else {
+        Preconditions.checkArgument(timeFormat.startsWith(TimeFormat.SIMPLE_DATE_FORMAT.toString()),
+            "Invalid time format: %s, must be one of '%s' or '%s:<pattern>'", timeFormat, TimeFormat.EPOCH,
+            TimeFormat.SIMPLE_DATE_FORMAT);
+        _inputTimeUnit = null;
+        _inputSDF = new SimpleDateFormat(timeFormat.substring(timeFormat.indexOf(':') + 1));
+        _inputSDF.setTimeZone(TimeZone.getTimeZone("UTC"));
+      }
+    } else {
+      _outputSDF = null;
+      _inputTimeUnit = null;
+      _inputSDF = null;
     }
+  }
 
-    String minTimeValueNormalized = null;
-    String maxTimeValueNormalized = null;
-    // For append use cases, we add dates; otherwise, we don't
-    if (_tablePushType.equalsIgnoreCase("APPEND")) {
-      minTimeValueNormalized = getNormalizedDate(minTimeValue);
-      maxTimeValueNormalized = getNormalizedDate(maxTimeValue);
-      LOGGER.info("Table push type is append. Min time value = " + minTimeValueNormalized + " and max time value = "
-          + maxTimeValueNormalized + " table name: " + _tableName);
+  @Override
+  public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+    Integer sequenceIdInSegmentName = !_excludeSequenceId && sequenceId >= 0 ? sequenceId : null;
+    if (!_appendPushType) {
+      return JOINER.join(_segmentNamePrefix, sequenceIdInSegmentName);
     } else {
-      LOGGER.info("Table push type is refresh for table: " + _tableName);
+      return JOINER.join(_segmentNamePrefix, getNormalizedDate(Preconditions.checkNotNull(minTimeValue)),
+          getNormalizedDate(Preconditions.checkNotNull(maxTimeValue)), sequenceIdInSegmentName);
     }
-    return Joiner.on("_").skipNulls()
-        .join(Arrays.asList(segmentPrefix.trim(), minTimeValueNormalized, maxTimeValueNormalized, sequenceId));
   }
 
   /**
-   * Convert the raw time value into human readable date format
+   * Converts the time value into human readable date format.
    *
-   * @param timeValue time column value
-   * @return normalized date string
+   * @param timeValue Time value
+   * @return Normalized date string
    */
-  private String getNormalizedDate(long timeValue) {
-    Date sinceEpoch = null;
-    if (_schemaTimeFormat.equals(TimeGranularitySpec.TimeFormat.EPOCH.toString())) {
-      TimeUnit timeUnit = TimeUnit.valueOf(_timeColumnType);
-
-      switch (timeUnit) {
-        case MILLISECONDS:
-          sinceEpoch = new Date(timeValue);
-          break;
-        case SECONDS:
-          sinceEpoch = new Date(TimeUnit.SECONDS.toMillis(timeValue));
-          break;
-        case MINUTES:
-          sinceEpoch = new Date(TimeUnit.MINUTES.toMillis(timeValue));
-          break;
-        case HOURS:
-          sinceEpoch = new Date(TimeUnit.HOURS.toMillis(timeValue));
-          break;
-        case DAYS:
-          sinceEpoch = new Date(TimeUnit.DAYS.toMillis(timeValue));
-          break;
-      }
+  private String getNormalizedDate(Object timeValue) {
+    if (_inputTimeUnit != null) {
+      return _outputSDF.format(new Date(_inputTimeUnit.toMillis(Long.parseLong(timeValue.toString()))));
     } else {
       try {
-        SimpleDateFormat dateFormatPassedIn = new SimpleDateFormat(_schemaTimeFormat);
-        dateFormatPassedIn.setTimeZone(TimeZone.getTimeZone("UTC"));
-        sinceEpoch = dateFormatPassedIn.parse(Long.toString(timeValue));
-      } catch (Exception e) {
-        throw new RuntimeException("Could not parse simple date format: '" + _schemaTimeFormat);
+        return _outputSDF.format(_inputSDF.parse(timeValue.toString()));
+      } catch (ParseException e) {
+        throw new RuntimeException(String
+            .format("Caught exception while parsing simple date format: %s with value: %s", _inputSDF.toPattern(),
+                timeValue), e);
       }
     }
+  }
 
-    // Pick the current time when having a parsing error
-    if (sinceEpoch == null) {
-      LOGGER.warn("Could not translate timeType '" + _timeColumnType);
-      sinceEpoch = new Date();
+  @Override
+  public String toString() {
+    StringBuilder stringBuilder =
+        new StringBuilder("NormalizedDateSegmentNameGenerator: segmentNamePrefix=").append(_segmentNamePrefix)
+            .append(", appendPushType=").append(_appendPushType);
+    if (_excludeSequenceId) {
+      stringBuilder.append(", excludeSequenceId=true");
     }
-
-    // Get date format
-    SimpleDateFormat dateFormat;
-    if (_tablePushFrequency.equalsIgnoreCase("HOURLY")) {
-      dateFormat = new SimpleDateFormat("yyyy-MM-dd-HH");
-    } else {
-      dateFormat = new SimpleDateFormat("yyyy-MM-dd");
+    if (_outputSDF != null) {
+      stringBuilder.append(", outputSDF=").append(_outputSDF.toPattern());
     }
-    dateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
-
-    return dateFormat.format(sinceEpoch);
+    if (_inputTimeUnit != null) {
+      stringBuilder.append(", inputTimeUnit=").append(_inputTimeUnit);
+    }
+    if (_inputSDF != null) {
+      stringBuilder.append(", inputSDF=").append(_inputSDF.toPattern());
+    }
+    return stringBuilder.toString();
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
index 00965e3..eed203c 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SegmentNameGenerator.java
@@ -18,13 +18,23 @@
  */
 package org.apache.pinot.core.segment.name;
 
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
+import com.google.common.base.Joiner;
+import javax.annotation.Nullable;
 
 
 /**
- * An interface that allows generates names for segments depending on the naming scheme.
+ * Interface for segment name generator based on the segment sequence id and time range.
  */
 public interface SegmentNameGenerator {
-  String generateSegmentName(ColumnStatistics timeColStatsCollector)
-      throws Exception;
+  Joiner JOINER = Joiner.on('_').skipNulls();
+
+  /**
+   * Generates the segment name.
+   *
+   * @param sequenceId Segment sequence id (negative value means INVALID)
+   * @param minTimeValue Minimum time value
+   * @param maxTimeValue Maximum time value
+   * @return Segment name generated
+   */
+  String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue);
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java
new file mode 100644
index 0000000..9660f8e
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGenerator.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.name;
+
+import javax.annotation.Nullable;
+
+
+/**
+ * Simple segment name generator which does not perform time conversion.
+ * <p>
+ * The segment name is simply joining the following fields with '_' but ignoring all the {@code null}s.
+ * <ul>
+ *   <li>Table name</li>
+ *   <li>Minimum time value</li>
+ *   <li>Maximum time value</li>
+ *   <li>Segment name postfix</li>
+ *   <li>Sequence id</li>
+ * </ul>
+ */
+public class SimpleSegmentNameGenerator implements SegmentNameGenerator {
+  private final String _tableName;
+  private final String _segmentNamePostfix;
+
+  public SimpleSegmentNameGenerator(String tableName, String segmentNamePostfix) {
+    _tableName = tableName;
+    _segmentNamePostfix = segmentNamePostfix;
+  }
+
+  @Override
+  public String generateSegmentName(int sequenceId, @Nullable Object minTimeValue, @Nullable Object maxTimeValue) {
+    return JOINER
+        .join(_tableName, minTimeValue, maxTimeValue, _segmentNamePostfix, sequenceId >= 0 ? sequenceId : null);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder stringBuilder = new StringBuilder("SimpleSegmentNameGenerator: tableName=").append(_tableName);
+    if (_segmentNamePostfix != null) {
+      stringBuilder.append(", segmentNamePostfix=").append(_segmentNamePostfix);
+    }
+    return stringBuilder.toString();
+  }
+}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGeneratorTest.java
deleted file mode 100644
index f94088e..0000000
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/DefaultSegmentNameGeneratorTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pinot.core.segment.name;
-
-import java.io.File;
-import java.util.concurrent.TimeUnit;
-import org.apache.commons.io.FileUtils;
-import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
-import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
-import org.apache.pinot.core.segment.creator.impl.SegmentCreationDriverFactory;
-import org.apache.pinot.core.segment.index.ColumnMetadataTest;
-import org.apache.pinot.segments.v1.creator.SegmentTestUtils;
-import org.apache.pinot.util.TestUtils;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-
-
-public class DefaultSegmentNameGeneratorTest {
-
-  private static final String AVRO_DATA = "data/test_data-mv.avro";
-  private static final File INDEX_DIR = new File(ColumnMetadataTest.class.toString());
-
-  @BeforeMethod
-  public void setUp()
-      throws Exception {
-    FileUtils.deleteQuietly(INDEX_DIR);
-  }
-
-  @AfterMethod
-  public void tearDown() {
-    FileUtils.deleteQuietly(INDEX_DIR);
-  }
-
-  public SegmentGeneratorConfig CreateSegmentConfigWithoutCreator()
-      throws Exception {
-    final String filePath =
-        TestUtils.getFileFromResourceUrl(DefaultSegmentNameGeneratorTest.class.getClassLoader().getResource(AVRO_DATA));
-    // Intentionally changed this to TimeUnit.Hours to make it non-default for testing.
-    SegmentGeneratorConfig config = SegmentTestUtils
-        .getSegmentGenSpecWithSchemAndProjectedColumns(new File(filePath), INDEX_DIR, "daysSinceEpoch", TimeUnit.HOURS,
-            "testTable");
-    config.setSegmentNamePostfix("1");
-    config.setTimeColumnName("daysSinceEpoch");
-    return config;
-  }
-
-  @Test
-  public void testPostfix()
-      throws Exception {
-    ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
-    // Build the Segment metadata.
-    SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
-    SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("daysSinceEpoch", "mytable", "1", -1);
-    config.setSegmentNameGenerator(segmentNameGenerator);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(config);
-    driver.build();
-    Assert.assertEquals(driver.getSegmentName(), "mytable_1756015683_1756015683_1");
-  }
-
-  @Test
-  public void testAlreadyNamedSegment()
-      throws Exception {
-    ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
-    // Build the Segment metadata.
-    SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
-    SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("mytable_1");
-    config.setSegmentNameGenerator(segmentNameGenerator);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(config);
-    driver.build();
-    Assert.assertEquals(driver.getSegmentName(), "mytable_1");
-  }
-
-  @Test
-  public void testNullTimeColumn()
-      throws Exception {
-    ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
-    // Build the Segment metadata.
-    SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
-    config.setTableName("mytable");
-    config.setSegmentNamePostfix("postfix");
-    config.setTimeColumnName(null);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(config);
-    driver.build();
-    Assert.assertEquals(driver.getSegmentName(), "mytable_postfix");
-  }
-
-  @Test
-  public void testNullTimeColumnThroughDefaultSegment()
-      throws Exception {
-    ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
-    // Build the Segment metadata.
-    SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
-    SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator(null, "mytable", "1", 2);
-    config.setSegmentNameGenerator(segmentNameGenerator);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(config);
-    driver.build();
-    Assert.assertEquals(driver.getSegmentName(), "mytable_1_2");
-  }
-
-  @Test
-  public void testNullPostfix()
-      throws Exception {
-    ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
-    // Build the Segment metadata.
-    SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
-    SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("daysSinceEpoch", "mytable", null, -1);
-    config.setSegmentNameGenerator(segmentNameGenerator);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(config);
-    driver.build();
-    Assert.assertEquals(driver.getSegmentName(), "mytable_1756015683_1756015683");
-  }
-
-  @Test
-  public void testNullPostfixWithNonNegSequenceId()
-      throws Exception {
-    ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
-    // Build the Segment metadata.
-    SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
-    SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator("daysSinceEpoch", "mytable", null, 2);
-    config.setSegmentNameGenerator(segmentNameGenerator);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(config);
-    driver.build();
-    Assert.assertEquals(driver.getSegmentName(), "mytable_1756015683_1756015683_2");
-  }
-
-  @Test
-  public void testOnlyTableName()
-      throws Exception {
-    ColumnMetadataTest columnMetadataTest = new ColumnMetadataTest();
-    // Build the Segment metadata.
-    SegmentGeneratorConfig config = columnMetadataTest.CreateSegmentConfigWithoutCreator();
-    SegmentNameGenerator segmentNameGenerator = new DefaultSegmentNameGenerator(null, "mytable", null, -1);
-    config.setSegmentNameGenerator(segmentNameGenerator);
-    SegmentIndexCreationDriver driver = SegmentCreationDriverFactory.get(null);
-    driver.init(config);
-    driver.build();
-    Assert.assertEquals(driver.getSegmentName(), "mytable");
-  }
-}
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
index bd16532..52bfcab 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/NormalizedDateSegmentNameGeneratorTest.java
@@ -18,135 +18,141 @@
  */
 package org.apache.pinot.core.segment.name;
 
-import org.apache.pinot.common.data.DateTimeFieldSpec;
-import org.apache.pinot.core.segment.creator.ColumnStatistics;
-import org.mockito.Mockito;
-import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
 
 
 public class NormalizedDateSegmentNameGeneratorTest {
   private static final String TABLE_NAME = "myTable";
-  private static final int SEQUENCE_ID = 1;
-  private static final String TIME_COLUMN_TYPE = "DAYS";
-  private static final String TABLE_PUSH_FREQUENCY = "daily";
-  private static final String PREFIX = "myTable_daily";
+  private static final String SEGMENT_NAME_PREFIX = "myTable_daily";
   private static final String APPEND_PUSH_TYPE = "APPEND";
   private static final String REFRESH_PUSH_TYPE = "REFRESH";
+  private static final String DAYS_TIME_TYPE = "DAYS";
+  private static final String HOURS_TIME_TYPE = "HOURS";
+  private static final String EPOCH_TIME_FORMAT = "EPOCH";
+  private static final String LONG_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyyMMdd";
+  private static final String STRING_SIMPLE_DATE_FORMAT = "SIMPLE_DATE_FORMAT:yyyy-MM-dd";
+  private static final String DAILY_PUSH_FREQUENCY = "daily";
+  private static final String HOURLY_PUSH_FREQUENCY = "hourly";
+  private static final int INVALID_SEQUENCE_ID = -1;
+  private static final int VALID_SEQUENCE_ID = 1;
 
   @Test
-  public void testAppend()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
-            APPEND_PUSH_TYPE, null, null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
-        "myTable_1970-01-02_1970-01-04_1");
+  public void testRefresh() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, REFRESH_PUSH_TYPE, null, null, null);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_1");
+  }
+
+  @Test
+  public void testWithSegmentNamePrefix() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, null, REFRESH_PUSH_TYPE, null, null,
+            null);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_1");
   }
 
   @Test
-  public void testAppendWithPrefix()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
-            APPEND_PUSH_TYPE, PREFIX, null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
-        "myTable_daily_1970-01-02_1970-01-04_1");
+  public void testWithUntrimmedSegmentNamePrefix() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX + "  ", null, REFRESH_PUSH_TYPE, null,
+            null, null);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily_1");
   }
 
   @Test
-  public void testRefresh()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, null, TABLE_PUSH_FREQUENCY, REFRESH_PUSH_TYPE,
-            null, null, null);
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass), "myTable_1");
+  public void testExcludeSequenceId() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, "true", REFRESH_PUSH_TYPE, null, null, null);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=false, excludeSequenceId=true");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable");
   }
 
   @Test
-  public void testNoSequenceIdWithParameter()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, null, TABLE_PUSH_FREQUENCY, REFRESH_PUSH_TYPE,
-            null, "true", null);
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass), "myTable");
+  public void testWithPrefixExcludeSequenceId() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_PREFIX, "true", REFRESH_PUSH_TYPE, null, null,
+            null);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable_daily, appendPushType=false, excludeSequenceId=true");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "myTable_daily");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "myTable_daily");
   }
 
   @Test
-  public void testRefreshWithPrefixNoSequenceId()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, null, TABLE_PUSH_FREQUENCY, REFRESH_PUSH_TYPE,
-            "prefix", "true", null);
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass), "prefix");
+  public void testAppend() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+            DAYS_TIME_TYPE, EPOCH_TIME_FORMAT);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=DAYS");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
+        "myTable_1970-01-02_1970-01-04");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 1L, 3L),
+        "myTable_1970-01-02_1970-01-04_1");
   }
 
   @Test
-  public void testAppendWithPrefixNoSequenceId()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
-            APPEND_PUSH_TYPE, "prefix", "true", DateTimeFieldSpec.TimeFormat.EPOCH.toString());
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
-        "prefix_1970-01-02_1970-01-04");
+  public void testHoursTimeType() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+            HOURS_TIME_TYPE, EPOCH_TIME_FORMAT);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputTimeUnit=HOURS");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 24L, 72L),
+        "myTable_1970-01-02_1970-01-04");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 24L, 72L),
+        "myTable_1970-01-02_1970-01-04_1");
   }
 
   @Test
-  public void testMirrorShare()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
-            APPEND_PUSH_TYPE, "mirrorShareEvents_daily", null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
-        "mirrorShareEvents_daily_1970-01-02_1970-01-04_1");
+  public void testLongSimpleDateFormat() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+            DAYS_TIME_TYPE, LONG_SIMPLE_DATE_FORMAT);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyyMMdd");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 19700102L, 19700104L),
+        "myTable_1970-01-02_1970-01-04");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 19700102L, 19700104L),
+        "myTable_1970-01-02_1970-01-04_1");
   }
 
   @Test
-  public void testUntrimmedPrefix()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(3L);
-    when(columnStatisticsClass.getMinValue()).thenReturn(1L);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
-            APPEND_PUSH_TYPE, "mirrorShareEvents_daily  ", null, DateTimeFieldSpec.TimeFormat.EPOCH.toString());
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
-        "mirrorShareEvents_daily_1970-01-02_1970-01-04_1");
+  public void testStringSimpleDateFormat() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, DAILY_PUSH_FREQUENCY,
+            DAYS_TIME_TYPE, STRING_SIMPLE_DATE_FORMAT);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd, inputSDF=yyyy-MM-dd");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, "1970-01-02", "1970-01-04"),
+        "myTable_1970-01-02_1970-01-04");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, "1970-01-02", "1970-01-04"),
+        "myTable_1970-01-02_1970-01-04_1");
   }
 
   @Test
-  public void testSimpleDateFormat()
-      throws Exception {
-    ColumnStatistics columnStatisticsClass = Mockito.mock(ColumnStatistics.class);
-    when(columnStatisticsClass.getMaxValue()).thenReturn(19700104);
-    when(columnStatisticsClass.getMinValue()).thenReturn(19700102);
-    NormalizedDateSegmentNameGenerator normalizedDataSegmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(TABLE_NAME, SEQUENCE_ID, TIME_COLUMN_TYPE, TABLE_PUSH_FREQUENCY,
-            APPEND_PUSH_TYPE, "mirrorShareEvents_daily  ", null, "yyyyMMdd");
-    Assert.assertEquals(normalizedDataSegmentNameGenerator.generateSegmentName(columnStatisticsClass),
-        "mirrorShareEvents_daily_1970-01-02_1970-01-04_1");
+  public void testHourlyPushFrequency() {
+    SegmentNameGenerator segmentNameGenerator =
+        new NormalizedDateSegmentNameGenerator(TABLE_NAME, null, null, APPEND_PUSH_TYPE, HOURLY_PUSH_FREQUENCY,
+            DAYS_TIME_TYPE, EPOCH_TIME_FORMAT);
+    assertEquals(segmentNameGenerator.toString(),
+        "NormalizedDateSegmentNameGenerator: segmentNamePrefix=myTable, appendPushType=true, outputSDF=yyyy-MM-dd-HH, inputTimeUnit=DAYS");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, 1L, 3L),
+        "myTable_1970-01-02-00_1970-01-04-00");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, 1L, 3L),
+        "myTable_1970-01-02-00_1970-01-04-00_1");
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGeneratorTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGeneratorTest.java
new file mode 100644
index 0000000..61b0f6a
--- /dev/null
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/name/SimpleSegmentNameGeneratorTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.segment.name;
+
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.assertEquals;
+
+
+public class SimpleSegmentNameGeneratorTest {
+  private static final String TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME_POSTFIX = "postfix";
+  private static final int INVALID_SEQUENCE_ID = -1;
+  private static final int VALID_SEQUENCE_ID = 0;
+  private static final long MIN_TIME_VALUE = 1234L;
+  private static final long MAX_TIME_VALUE = 5678L;
+
+  @Test
+  public void testWithoutSegmentNamePostfix() {
+    SegmentNameGenerator segmentNameGenerator = new SimpleSegmentNameGenerator(TABLE_NAME, null);
+    assertEquals(segmentNameGenerator.toString(), "SimpleSegmentNameGenerator: tableName=testTable");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "testTable");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+        "testTable_1234_5678");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "testTable_0");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+        "testTable_1234_5678_0");
+  }
+
+  @Test
+  public void testWithSegmentNamePostfix() {
+    SegmentNameGenerator segmentNameGenerator = new SimpleSegmentNameGenerator(TABLE_NAME, SEGMENT_NAME_POSTFIX);
+    assertEquals(segmentNameGenerator.toString(),
+        "SimpleSegmentNameGenerator: tableName=testTable, segmentNamePostfix=postfix");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, null, null), "testTable_postfix");
+    assertEquals(segmentNameGenerator.generateSegmentName(INVALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+        "testTable_1234_5678_postfix");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, null, null), "testTable_postfix_0");
+    assertEquals(segmentNameGenerator.generateSegmentName(VALID_SEQUENCE_ID, MIN_TIME_VALUE, MAX_TIME_VALUE),
+        "testTable_1234_5678_postfix_0");
+  }
+}
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
index c87378a..1a50c5c 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/JobConfigConstants.java
@@ -29,14 +29,24 @@ public class JobConfigConstants {
   public static final String SEGMENT_TAR_DIR = "segmentTar";
   public static final String TAR_GZ_FILE_EXT = ".tar.gz";
 
-  public static final String SCHEMA = "data.schema";
   public static final String SEGMENT_TABLE_NAME = "segment.table.name";
+  public static final String TABLE_CONFIG = "table.config";
+  public static final String SCHEMA = "data.schema";
+
+  public static final String SEGMENT_NAME_GENERATOR_TYPE = "segment.name.generator.type";
+  public static final String SIMPLE_SEGMENT_NAME_GENERATOR = "simple";
+  public static final String NORMALIZED_DATE_SEGMENT_NAME_GENERATOR = "normalizedDate";
+  public static final String DEFAULT_SEGMENT_NAME_GENERATOR = SIMPLE_SEGMENT_NAME_GENERATOR;
+
+  // For SimpleSegmentNameGenerator
   public static final String SEGMENT_NAME_POSTFIX = "segment.name.postfix";
 
+  // For NormalizedDateSegmentNameGenerator
+  public static final String SEGMENT_NAME_PREFIX = "segment.name.prefix";
+  public static final String EXCLUDE_SEQUENCE_ID = "exclude.sequence.id";
+
   public static final String PUSH_TO_HOSTS = "push.to.hosts";
   public static final String PUSH_TO_PORT = "push.to.port";
 
-  public static final String TABLE_CONFIG = "table.config";
-
   public static final String DEFAULT_PERMISSIONS_MASK = "fs.permissions.umask-mode";
 }
diff --git a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
index a07ecf5..dcfab1a 100644
--- a/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
+++ b/pinot-hadoop/src/main/java/org/apache/pinot/hadoop/job/mapper/SegmentCreationMapper.java
@@ -32,8 +32,10 @@ import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.pinot.common.config.SegmentsValidationAndRetentionConfig;
 import org.apache.pinot.common.config.TableConfig;
 import org.apache.pinot.common.data.Schema;
+import org.apache.pinot.common.data.TimeFieldSpec;
 import org.apache.pinot.common.utils.DataSize;
 import org.apache.pinot.common.utils.JsonUtils;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
@@ -44,6 +46,9 @@ import org.apache.pinot.core.data.readers.ThriftRecordReaderConfig;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.core.segment.name.NormalizedDateSegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SegmentNameGenerator;
+import org.apache.pinot.core.segment.name.SimpleSegmentNameGenerator;
 import org.apache.pinot.hadoop.job.JobConfigConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,10 +62,10 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
   protected Configuration _jobConf;
   protected String _rawTableName;
   protected Schema _schema;
+  protected SegmentNameGenerator _segmentNameGenerator;
 
   // Optional
   protected TableConfig _tableConfig;
-  protected String _segmentNamePostfix;
   protected Path _readerConfigFile;
 
   // HDFS segment tar directory
@@ -90,12 +95,36 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     if (tableConfigString != null) {
       _tableConfig = TableConfig.fromJsonString(tableConfigString);
     }
-    _segmentNamePostfix = _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX);
     String readerConfigFile = _jobConf.get(JobConfigConstants.PATH_TO_READER_CONFIG);
     if (readerConfigFile != null) {
       _readerConfigFile = new Path(readerConfigFile);
     }
 
+    // Set up segment name generator
+    String segmentNameGeneratorType =
+        _jobConf.get(JobConfigConstants.SEGMENT_NAME_GENERATOR_TYPE, JobConfigConstants.DEFAULT_SEGMENT_NAME_GENERATOR);
+    switch (segmentNameGeneratorType) {
+      case JobConfigConstants.SIMPLE_SEGMENT_NAME_GENERATOR:
+        _segmentNameGenerator =
+            new SimpleSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_POSTFIX));
+        break;
+      case JobConfigConstants.NORMALIZED_DATE_SEGMENT_NAME_GENERATOR:
+        Preconditions.checkState(_tableConfig != null,
+            "In order to use NormalizedDateSegmentNameGenerator, table config must be provided");
+        SegmentsValidationAndRetentionConfig validationConfig = _tableConfig.getValidationConfig();
+        String timeFormat = null;
+        TimeFieldSpec timeFieldSpec = _schema.getTimeFieldSpec();
+        if (timeFieldSpec != null) {
+          timeFormat = timeFieldSpec.getOutgoingGranularitySpec().getTimeFormat();
+        }
+        _segmentNameGenerator =
+            new NormalizedDateSegmentNameGenerator(_rawTableName, _jobConf.get(JobConfigConstants.SEGMENT_NAME_PREFIX),
+                _jobConf.get(JobConfigConstants.EXCLUDE_SEQUENCE_ID), validationConfig.getSegmentPushType(),
+                validationConfig.getSegmentPushFrequency(), validationConfig.getTimeType(), timeFormat);
+      default:
+        throw new UnsupportedOperationException("Unsupported segment name generator type: " + segmentNameGeneratorType);
+    }
+
     // Working directories
     _hdfsSegmentTarDir = new Path(FileOutputFormat.getWorkOutputPath(context), JobConfigConstants.SEGMENT_TAR_DIR);
     _localStagingDir = new File(LOCAL_TEMP_DIR);
@@ -119,8 +148,8 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     _logger.info("*********************************************************************");
     _logger.info("Raw Table Name: {}", _rawTableName);
     _logger.info("Schema: {}", _schema);
+    _logger.info("Segment Name Generator: {}", _segmentNameGenerator);
     _logger.info("Table Config: {}", _tableConfig);
-    _logger.info("Segment Name Postfix: {}", _segmentNamePostfix);
     _logger.info("Reader Config File: {}", _readerConfigFile);
     _logger.info("*********************************************************************");
     _logger.info("HDFS Segment Tar Directory: {}", _hdfsSegmentTarDir);
@@ -149,10 +178,8 @@ public class SegmentCreationMapper extends Mapper<LongWritable, Text, LongWritab
     segmentGeneratorConfig.setTableName(_rawTableName);
     segmentGeneratorConfig.setInputFilePath(localInputFile.getPath());
     segmentGeneratorConfig.setOutDir(_localSegmentDir.getPath());
+    segmentGeneratorConfig.setSegmentNameGenerator(_segmentNameGenerator);
     segmentGeneratorConfig.setSequenceId(sequenceId);
-    if (_segmentNamePostfix != null) {
-      segmentGeneratorConfig.setSegmentNamePostfix(_segmentNamePostfix);
-    }
     FileFormat fileFormat = getFileFormat(inputFileName);
     segmentGeneratorConfig.setFormat(fileFormat);
     segmentGeneratorConfig.setReaderConfig(getReaderConfig(fileFormat));
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java
index 0f81130..34094b9 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/ColumnarToStarTreeConverter.java
@@ -25,13 +25,10 @@ import org.apache.pinot.common.data.StarTreeIndexSpec;
 import org.apache.pinot.common.segment.SegmentMetadata;
 import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.data.readers.FileFormat;
-import org.apache.pinot.core.data.readers.PinotSegmentRecordReader;
 import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig;
 import org.apache.pinot.core.segment.creator.SegmentIndexCreationDriver;
 import org.apache.pinot.core.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.core.segment.index.SegmentMetadataImpl;
-import org.apache.pinot.core.segment.name.DefaultSegmentNameGenerator;
-import org.apache.pinot.core.segment.name.SegmentNameGenerator;
 import org.kohsuke.args4j.CmdLineParser;
 import org.kohsuke.args4j.Option;
 
@@ -111,14 +108,15 @@ public class ColumnarToStarTreeConverter {
    */
   private void convertSegment(File columnarSegment)
       throws Exception {
-    PinotSegmentRecordReader pinotSegmentRecordReader = new PinotSegmentRecordReader(columnarSegment);
-    SegmentGeneratorConfig config = new SegmentGeneratorConfig(pinotSegmentRecordReader.getSchema());
-
+    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(columnarSegment);
+    SegmentGeneratorConfig config = new SegmentGeneratorConfig(segmentMetadata.getSchema());
     config.setDataDir(_inputDirName);
     config.setInputFilePath(columnarSegment.getAbsolutePath());
     config.setFormat(FileFormat.PINOT);
     config.setOutDir(_outputDirName);
     config.setOverwrite(_overwrite);
+    config.setTableName(segmentMetadata.getTableName());
+    config.setSegmentName(segmentMetadata.getName());
 
     StarTreeIndexSpec starTreeIndexSpec = null;
     if (_starTreeConfigFileName != null) {
@@ -126,12 +124,6 @@ public class ColumnarToStarTreeConverter {
     }
     config.enableStarTreeIndex(starTreeIndexSpec);
 
-    // Read the segment and table name from the segment's metadata.
-    SegmentMetadata metadata = new SegmentMetadataImpl(columnarSegment);
-    SegmentNameGenerator nameGenerator = new DefaultSegmentNameGenerator(metadata.getName());
-    config.setSegmentNameGenerator(nameGenerator);
-    config.setTableName(metadata.getTableName());
-
     SegmentIndexCreationDriver indexCreator = new SegmentIndexCreationDriverImpl();
     indexCreator.init(config);
     indexCreator.build();
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
index 8c9f3f5..8a86a7a 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/segment/converter/SegmentMergeCommand.java
@@ -179,7 +179,7 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
 
       // Compute segment name if it is not specified
       if (_outputSegmentName == null) {
-        _outputSegmentName = getDefaultSegmentName(tableConfig, schema, inputIndexDirs, minStartTime, maxEndTime);
+        _outputSegmentName = getDefaultSegmentName(tableConfig, schema, minStartTime, maxEndTime);
       }
       LOGGER.info("Output segment name: {}", _outputSegmentName);
 
@@ -234,23 +234,20 @@ public class SegmentMergeCommand extends AbstractBaseAdminCommand implements Com
     return "Create the merged segment using concatenation";
   }
 
-  private String getDefaultSegmentName(TableConfig tableConfig, Schema schema, List<File> inputIndexDirs,
-      long minStartTime, long maxEndTime)
-      throws Exception {
+  private String getDefaultSegmentName(TableConfig tableConfig, Schema schema, long minStartTime, long maxEndTime) {
     String tableName = tableConfig.getTableName();
 
     // Fetch time related configurations from schema and table config.
     String pushFrequency = tableConfig.getValidationConfig().getSegmentPushFrequency();
-    String timeColumnType = tableConfig.getValidationConfig().getTimeType();
+    String timeType = tableConfig.getValidationConfig().getTimeType();
     String pushType = tableConfig.getValidationConfig().getSegmentPushType();
     String timeFormat = schema.getTimeFieldSpec().getOutgoingGranularitySpec().getTimeFormat();
 
     // Generate the final segment name using segment name generator
     NormalizedDateSegmentNameGenerator segmentNameGenerator =
-        new NormalizedDateSegmentNameGenerator(tableName, DEFAULT_SEQUENCE_ID, timeColumnType, pushFrequency, pushType,
-            null, null, timeFormat);
+        new NormalizedDateSegmentNameGenerator(tableName, null, null, pushType, pushFrequency, timeType, timeFormat);
 
-    return segmentNameGenerator.generateSegmentName(minStartTime, maxEndTime);
+    return segmentNameGenerator.generateSegmentName(DEFAULT_SEQUENCE_ID, minStartTime, maxEndTime);
   }
 
   private boolean isPinotSegment(File path) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org