You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/03/08 23:28:29 UTC

[GitHub] [pinot] Jackie-Jiang commented on a change in pull request #8312: Refactor Flink Segment Writer to use File-based Segment writer

Jackie-Jiang commented on a change in pull request #8312:
URL: https://github.com/apache/pinot/pull/8312#discussion_r822160634



##########
File path: pinot-plugins/pinot-segment-writer/pinot-segment-writer-file-based/src/main/java/org/apache/pinot/plugin/segmentwriter/filebased/FileBasedSegmentWriter.java
##########
@@ -189,23 +188,19 @@ public URI flush()
       // Build segment
       SegmentGeneratorConfig segmentGeneratorConfig =
           IngestionUtils.generateSegmentGeneratorConfig(_tableConfig, _schema, batchIngestionConfig);
+      if (getSeqId() != null) {
+        segmentGeneratorConfig.setSequenceId(getSeqId());
+      }
+
       String segmentName = IngestionUtils.buildSegment(segmentGeneratorConfig);
       LOGGER.info("Successfully built segment: {} for table: {}", segmentName, _tableNameWithType);
 
       // Tar segment
-      File segmentTarFile = new File(_outputDirURI, segmentName + Constants.TAR_GZ_FILE_EXT);
-      if (segmentTarFile.exists()) {
-        if (!_batchConfig.isOverwriteOutput()) {
-          throw new IllegalArgumentException(String.format("Duplicate segment name generated '%s' in '%s', please "
-              + "adjust segment name generator config to avoid duplicates, or allow batch config overwrite",
-              segmentName, _outputDirURI));
-        } else {
-          LOGGER.warn(String.format("Duplicate segment name detected '%s' in file '%s', deleting old segment",
-              segmentName, segmentDir));
-          if (segmentTarFile.delete()) {
-            LOGGER.warn(String.format("Segment file deleted: '%s/%s'", _outputDirURI, segmentName));
-          }
-        }
+      File segmentTarFile = getSegmentTarFile(_outputDirURI, segmentName);
+      if (!_batchConfig.isOverwriteOutput() && segmentTarFile.exists()) {

Review comment:
       Any reason for this behavior change?

##########
File path: pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/segment/writer/SegmentWriter.java
##########
@@ -65,6 +68,27 @@ default void collect(GenericRow[] rowBatch)
     }
   }
 
+  /**
+   * Sets the staging directory that stores the written segment files.
+   * @param tableConfig
+   */
+  default File setStagingDir(TableConfig tableConfig) {

Review comment:
       Let's move these methods into `FileBasedSegmentWriter`. I don't think they belong to the interface




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org