You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2021/07/24 01:05:42 UTC

[pinot] branch master updated: Reduce the disk usage for segment conversion task (#7193)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 2f3774b  Reduce the disk usage for segment conversion task (#7193)
2f3774b is described below

commit 2f3774b09a1122cb8d0ef00e9322f7d247f07aa7
Author: Xiaotian (Jackie) Jiang <17...@users.noreply.github.com>
AuthorDate: Fri Jul 23 18:05:26 2021 -0700

    Reduce the disk usage for segment conversion task (#7193)
    
    Reduce the disk usage for segment conversion task by:
    - Eliminate the file copy
    - Delete the file once it's no longer needed
---
 .../framework/SegmentProcessorFramework.java       | 123 ++-----
 .../framework/SegmentProcessorFrameworkTest.java   | 386 +++++++--------------
 .../BaseMultipleSegmentsConversionExecutor.java    |  34 +-
 .../tasks/BaseSingleSegmentConversionExecutor.java |  22 +-
 .../merge_rollup/MergeRollupTaskExecutor.java      |  38 +-
 .../RealtimeToOfflineSegmentsTaskExecutor.java     |  37 +-
 .../command/SegmentProcessorFrameworkCommand.java  |  53 ++-
 7 files changed, 285 insertions(+), 408 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index c8df450..241ead6 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -20,12 +20,12 @@ package org.apache.pinot.core.segment.processing.framework;
 
 import com.google.common.base.Preconditions;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager;
 import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileReader;
 import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileRecordReader;
@@ -35,10 +35,7 @@ import org.apache.pinot.core.segment.processing.reducer.ReducerFactory;
 import org.apache.pinot.segment.local.recordtransformer.CompositeTransformer;
 import org.apache.pinot.segment.local.segment.creator.RecordReaderSegmentCreationDataSource;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
-import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
-import org.apache.pinot.spi.config.table.TableConfig;
-import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.data.readers.RecordReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,102 +54,48 @@ import org.slf4j.LoggerFactory;
 public class SegmentProcessorFramework {
   private static final Logger LOGGER = LoggerFactory.getLogger(SegmentProcessorFramework.class);
 
-  private final File _inputSegmentsDir;
-  private final File _outputSegmentsDir;
+  private final List<RecordReader> _recordReaders;
   private final SegmentProcessorConfig _segmentProcessorConfig;
-
-  private final TableConfig _tableConfig;
-  private final Schema _schema;
-
-  private final File _baseDir;
-  private final File _mapperInputDir;
   private final File _mapperOutputDir;
   private final File _reducerOutputDir;
+  private final File _segmentsOutputDir;
 
   /**
-   * Initializes the Segment Processor framework with input segments, output path and processing config
-   * @param inputSegmentsDir directory containing the input segments. These can be tarred or untarred.
-   * @param segmentProcessorConfig config for segment processing
-   * @param outputSegmentsDir directory for placing the resulting segments. This should already exist.
+   * Initializes the SegmentProcessorFramework with record readers, config and working directory.
    */
-  public SegmentProcessorFramework(File inputSegmentsDir, SegmentProcessorConfig segmentProcessorConfig,
-      File outputSegmentsDir) {
+  public SegmentProcessorFramework(List<RecordReader> recordReaders, SegmentProcessorConfig segmentProcessorConfig,
+      File workingDir)
+      throws IOException {
+    Preconditions.checkState(!recordReaders.isEmpty(), "No record reader is provided");
 
-    LOGGER.info(
-        "Initializing SegmentProcessorFramework with input segments dir: {}, output segments dir: {} and segment processor config: {}",
-        inputSegmentsDir.getAbsolutePath(), outputSegmentsDir.getAbsolutePath(), segmentProcessorConfig.toString());
-
-    _inputSegmentsDir = inputSegmentsDir;
-    Preconditions.checkState(_inputSegmentsDir.exists() && _inputSegmentsDir.isDirectory(),
-        "Input path: %s must be a directory with Pinot segments", _inputSegmentsDir.getAbsolutePath());
-    _outputSegmentsDir = outputSegmentsDir;
-    Preconditions.checkState(
-        _outputSegmentsDir.exists() && _outputSegmentsDir.isDirectory() && (_outputSegmentsDir.list().length == 0),
-        "Must provide existing empty output directory: %s", _outputSegmentsDir.getAbsolutePath());
+    LOGGER.info("Initializing SegmentProcessorFramework with {} record readers, config: {}, working dir: {}",
+        recordReaders.size(), segmentProcessorConfig, workingDir.getAbsolutePath());
 
+    _recordReaders = recordReaders;
     _segmentProcessorConfig = segmentProcessorConfig;
-    _tableConfig = segmentProcessorConfig.getTableConfig();
-    _schema = segmentProcessorConfig.getSchema();
 
-    _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_" + System.currentTimeMillis());
-    FileUtils.deleteQuietly(_baseDir);
-    Preconditions.checkState(_baseDir.mkdirs(), "Failed to create base directory: %s for SegmentProcessor", _baseDir);
-    _mapperInputDir = new File(_baseDir, "mapper_input");
-    Preconditions
-        .checkState(_mapperInputDir.mkdirs(), "Failed to create mapper input directory: %s for SegmentProcessor",
-            _mapperInputDir);
-    _mapperOutputDir = new File(_baseDir, "mapper_output");
-    Preconditions
-        .checkState(_mapperOutputDir.mkdirs(), "Failed to create mapper output directory: %s for SegmentProcessor",
-            _mapperOutputDir);
-    _reducerOutputDir = new File(_baseDir, "reducer_output");
-    Preconditions
-        .checkState(_reducerOutputDir.mkdirs(), "Failed to create reducer output directory: %s for SegmentProcessor",
-            _reducerOutputDir);
+    _mapperOutputDir = new File(workingDir, "mapper_output");
+    FileUtils.forceMkdir(_mapperOutputDir);
+    _reducerOutputDir = new File(workingDir, "reducer_output");
+    FileUtils.forceMkdir(_reducerOutputDir);
+    _segmentsOutputDir = new File(workingDir, "segments_output");
+    FileUtils.forceMkdir(_segmentsOutputDir);
   }
 
   /**
-   * Processes segments from the input directory as per the provided configs, then puts resulting segments into the output directory
+   * Processes records from record readers per the provided config, returns the directories for the generated segments.
    */
-  public void processSegments()
+  public List<File> process()
       throws Exception {
-    // Check for input segments
-    File[] segmentFiles = _inputSegmentsDir.listFiles();
-    Preconditions
-        .checkState(segmentFiles != null && segmentFiles.length > 0, "Failed to find segments under input dir: %s",
-            _inputSegmentsDir.getAbsolutePath());
-
     // Map phase
-    LOGGER.info("Beginning map phase on segments: {}", Arrays.toString(_inputSegmentsDir.list()));
-    List<RecordReader> recordReaders = new ArrayList<>(segmentFiles.length);
-    for (File indexDir : segmentFiles) {
-      String fileName = indexDir.getName();
-
-      // Untar the segments if needed
-      if (!indexDir.isDirectory()) {
-        if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
-          indexDir = TarGzCompressionUtils.untar(indexDir, _mapperInputDir).get(0);
-        } else {
-          throw new IllegalStateException("Unsupported segment format: " + indexDir.getAbsolutePath());
-        }
-      }
-
-      PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
-      // NOTE: Do not fill null field with default value to be consistent with other record readers
-      recordReader.init(indexDir, null, null, true);
-      recordReaders.add(recordReader);
-    }
-    SegmentMapper mapper = new SegmentMapper(recordReaders, _segmentProcessorConfig, _mapperOutputDir);
+    LOGGER.info("Beginning map phase on {} record readers", _recordReaders.size());
+    SegmentMapper mapper = new SegmentMapper(_recordReaders, _segmentProcessorConfig, _mapperOutputDir);
     Map<String, GenericRowFileManager> partitionToFileManagerMap = mapper.map();
-    for (RecordReader recordReader : recordReaders) {
-      recordReader.close();
-    }
-    FileUtils.deleteDirectory(_mapperInputDir);
 
     // Check for mapper output files
     if (partitionToFileManagerMap.isEmpty()) {
       LOGGER.info("No partition generated from mapper phase, skipping the reducer phase");
-      return;
+      return Collections.emptyList();
     }
 
     // Reduce phase
@@ -166,8 +109,10 @@ public class SegmentProcessorFramework {
 
     // Segment creation phase
     LOGGER.info("Beginning segment creation phase on partitions: {}", partitionToFileManagerMap.keySet());
-    SegmentGeneratorConfig generatorConfig = new SegmentGeneratorConfig(_tableConfig, _schema);
-    generatorConfig.setOutDir(_outputSegmentsDir.getPath());
+    List<File> outputSegmentDirs = new ArrayList<>();
+    SegmentGeneratorConfig generatorConfig =
+        new SegmentGeneratorConfig(_segmentProcessorConfig.getTableConfig(), _segmentProcessorConfig.getSchema());
+    generatorConfig.setOutDir(_segmentsOutputDir.getPath());
     // TODO: Use NormalizedDateSegmentNameGenerator
     generatorConfig.setSegmentNamePrefix(_segmentProcessorConfig.getSegmentConfig().getSegmentNamePrefix());
     int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
@@ -192,18 +137,14 @@ public class SegmentProcessorFramework {
         driver.init(generatorConfig, new RecordReaderSegmentCreationDataSource(recordReaderForRange),
             passThroughTransformer, null);
         driver.build();
+        outputSegmentDirs.add(driver.getOutputDirectory());
       }
       fileManager.cleanUp();
     }
+    FileUtils.deleteDirectory(_mapperOutputDir);
+    FileUtils.deleteDirectory(_reducerOutputDir);
 
-    LOGGER.info("Successfully converted segments from: {} to {}", Arrays.toString(_inputSegmentsDir.list()),
-        Arrays.toString(_outputSegmentsDir.list()));
-  }
-
-  /**
-   * Cleans up the Segment Processor Framework state
-   */
-  public void cleanup() {
-    FileUtils.deleteQuietly(_baseDir);
+    LOGGER.info("Successfully created segments: {}", outputSegmentDirs);
+    return outputSegmentDirs;
   }
 }
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index eba34f2..d1ab033 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -20,17 +20,17 @@ package org.apache.pinot.core.segment.processing.framework;
 
 import com.google.common.collect.Lists;
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.stream.IntStream;
 import org.apache.commons.io.FileUtils;
-import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.segment.processing.timehandler.TimeHandler;
 import org.apache.pinot.core.segment.processing.timehandler.TimeHandlerConfig;
 import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
 import org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
 import org.apache.pinot.segment.local.segment.readers.GenericRowRecordReader;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.segment.spi.ColumnMetadata;
 import org.apache.pinot.segment.spi.ImmutableSegment;
 import org.apache.pinot.segment.spi.SegmentMetadata;
@@ -50,20 +50,21 @@ import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
 
 
 /**
  * End-to-end tests for SegmentProcessorFramework
  */
 public class SegmentProcessorFrameworkTest {
+  private static final File TEMP_DIR = new File(FileUtils.getTempDirectory(), "SegmentProcessorFrameworkTest");
 
-  private File _baseDir;
-  private File _emptyInputDir;
-  private File _singleSegment;
-  private File _multipleSegments;
-  private File _multiValueSegments;
-  private File _tarredSegments;
+  private List<RecordReader> _singleSegment;
+  private List<RecordReader> _multipleSegments;
+  private List<RecordReader> _multiValueSegments;
 
   private TableConfig _tableConfig;
   private TableConfig _tableConfigNullValueEnabled;
@@ -85,6 +86,9 @@ public class SegmentProcessorFrameworkTest {
   @BeforeClass
   public void setup()
       throws Exception {
+    FileUtils.deleteQuietly(TEMP_DIR);
+    FileUtils.forceMkdir(TEMP_DIR);
+
     _tableConfig = new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time").build();
     _tableConfigNullValueEnabled =
         new TableConfigBuilder(TableType.OFFLINE).setTableName("myTable").setTimeColumnName("time")
@@ -100,31 +104,14 @@ public class SegmentProcessorFrameworkTest {
             .addMetric("clicks", DataType.INT, 1000)
             .addDateTime("time", DataType.LONG, "1:MILLISECONDS:EPOCH", "1:MILLISECONDS").build();
 
-    _baseDir = new File(FileUtils.getTempDirectory(), "segment_processor_framework_test_" + System.currentTimeMillis());
-    FileUtils.deleteQuietly(_baseDir);
-    assertTrue(_baseDir.mkdirs());
-
     // create segments in many folders
-    _emptyInputDir = new File(_baseDir, "empty_input");
-    assertTrue(_emptyInputDir.mkdirs());
-    _singleSegment = new File(_baseDir, "single_segment");
-    createInputSegments(_singleSegment, _rawData, 1, _schema);
-    _multipleSegments = new File(_baseDir, "multiple_segments");
-    createInputSegments(_multipleSegments, _rawData, 3, _schema);
-    _multiValueSegments = new File(_baseDir, "multi_value_segment");
-    createInputSegments(_multiValueSegments, _rawDataMultiValue, 1, _schemaMV);
-    _tarredSegments = new File(_baseDir, "tarred_segment");
-    createInputSegments(_tarredSegments, _rawData, 3, _schema);
-    File[] segmentDirs = _tarredSegments.listFiles();
-    assertNotNull(segmentDirs);
-    for (File segmentDir : segmentDirs) {
-      TarGzCompressionUtils.createTarGzFile(segmentDir,
-          new File(_tarredSegments, segmentDir.getName() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION));
-      FileUtils.deleteQuietly(segmentDir);
-    }
+    _singleSegment = createInputSegments(new File(TEMP_DIR, "single_segment"), _rawData, 1, _schema);
+    _multipleSegments = createInputSegments(new File(TEMP_DIR, "multiple_segments"), _rawData, 3, _schema);
+    _multiValueSegments =
+        createInputSegments(new File(TEMP_DIR, "multi_value_segment"), _rawDataMultiValue, 1, _schemaMV);
   }
 
-  private void createInputSegments(File inputDir, List<Object[]> rawData, int numSegments, Schema schema)
+  private List<RecordReader> createInputSegments(File inputDir, List<Object[]> rawData, int numSegments, Schema schema)
       throws Exception {
     assertTrue(inputDir.mkdirs());
 
@@ -145,6 +132,7 @@ public class SegmentProcessorFrameworkTest {
       dataLists.add(dataList);
     }
 
+    List<RecordReader> segmentRecordReaders = new ArrayList<>(dataLists.size());
     int idx = 0;
     for (List<GenericRow> inputRows : dataLists) {
       RecordReader recordReader = new GenericRowRecordReader(inputRows);
@@ -155,11 +143,11 @@ public class SegmentProcessorFrameworkTest {
       SegmentIndexCreationDriverImpl driver = new SegmentIndexCreationDriverImpl();
       driver.init(segmentGeneratorConfig, recordReader);
       driver.build();
+      PinotSegmentRecordReader segmentRecordReader = new PinotSegmentRecordReader();
+      segmentRecordReader.init(driver.getOutputDirectory(), null, null, true);
+      segmentRecordReaders.add(segmentRecordReader);
     }
-
-    File[] files = inputDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, numSegments);
+    return segmentRecordReaders;
   }
 
   private GenericRow getGenericRow(Object[] rawRow) {
@@ -170,100 +158,26 @@ public class SegmentProcessorFrameworkTest {
     return row;
   }
 
-  @Test
-  public void testBadInputFolders()
-      throws Exception {
-    SegmentProcessorConfig config;
-
-    try {
-      new SegmentProcessorConfig.Builder().setSchema(_schema).build();
-      fail("Should fail for missing tableConfig");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    try {
-      new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).build();
-      fail("Should fail for missing schema");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
-
-    File outputSegmentDir = new File(_baseDir, "output_directory_bad_input_folders");
-    FileUtils.deleteQuietly(outputSegmentDir);
-    assertTrue(outputSegmentDir.mkdirs());
-
-    // non-existent input dir
-    File nonExistent = new File(_baseDir, "non_existent");
-    try {
-      new SegmentProcessorFramework(nonExistent, config, outputSegmentDir);
-      fail("Should fail for non existent input dir");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    // file used as input dir
-    File fileInput = new File(_baseDir, "file.txt");
-    assertTrue(fileInput.createNewFile());
-    try {
-      new SegmentProcessorFramework(fileInput, config, outputSegmentDir);
-      fail("Should fail for file used as input dir");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    // non existent output dir
-    try {
-      new SegmentProcessorFramework(_singleSegment, config, nonExistent);
-      fail("Should fail for non existent output dir");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    // file used as output dir
-    try {
-      new SegmentProcessorFramework(_singleSegment, config, fileInput);
-      fail("Should fail for file used as output dir");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    // output dir not empty
-    try {
-      new SegmentProcessorFramework(fileInput, config, _singleSegment);
-      fail("Should fail for output dir not empty");
-    } catch (IllegalStateException e) {
-      // expected
-    }
-
-    // empty input dir
-    SegmentProcessorFramework framework = new SegmentProcessorFramework(_emptyInputDir, config, outputSegmentDir);
-    try {
-      framework.processSegments();
-      fail("Should fail for empty input");
-    } catch (Exception e) {
-      framework.cleanup();
+  private void rewindRecordReaders(List<RecordReader> recordReaders)
+      throws IOException {
+    for (RecordReader recordReader : recordReaders) {
+      recordReader.rewind();
     }
   }
 
   @Test
   public void testSingleSegment()
       throws Exception {
-    File outputSegmentDir = new File(_baseDir, "single_segment_output");
-    FileUtils.forceMkdir(outputSegmentDir);
+    File workingDir = new File(TEMP_DIR, "single_segment_output");
+    FileUtils.forceMkdir(workingDir);
 
     // Default configs
     SegmentProcessorConfig config =
         new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
-    SegmentProcessorFramework framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    File[] files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    ImmutableSegment segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+    SegmentProcessorFramework framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    List<File> outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     SegmentMetadata segmentMetadata = segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 10);
     ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -286,18 +200,16 @@ public class SegmentProcessorFrameworkTest {
     assertNull(timeDataSource.getNullValueVector());
     assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
     segment.destroy();
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Default configs - null value enabled
     config =
         new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema).build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     segmentMetadata = segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 10);
     campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -326,70 +238,62 @@ public class SegmentProcessorFrameworkTest {
     assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
     assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
     segment.destroy();
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Time filter
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
         new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597795200000L, 1597881600000L).build())
         .build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    segmentMetadata = new SegmentMetadataImpl(files[0]);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 5);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
     assertEquals(timeMetadata.getCardinality(), 5);
     assertEquals(timeMetadata.getMinValue(), 1597795200000L);
     assertEquals(timeMetadata.getMaxValue(), 1597878000000L);
     assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597878000000_0");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Time filter - filtered everything
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
         new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597968000000L, 1598054400000L).build())
         .build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 0);
-    FileUtils.cleanDirectory(outputSegmentDir);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertTrue(outputSegments.isEmpty());
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Time round
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
         .setTimeHandlerConfig(new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setRoundBucketMs(86400000).build())
         .build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    segmentMetadata = new SegmentMetadataImpl(files[0]);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 10);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
     assertEquals(timeMetadata.getCardinality(), 3);
     assertEquals(timeMetadata.getMinValue(), 1597708800000L);
     assertEquals(timeMetadata.getMaxValue(), 1597881600000L);
     assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597881600000_0");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Time partition
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
         new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setPartitionBucketMs(86400000).build()).build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 3);
-    Arrays.sort(files);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 3);
+    outputSegments.sort(null);
     // segment 0
-    segmentMetadata = new SegmentMetadataImpl(files[0]);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 3);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
     assertEquals(timeMetadata.getCardinality(), 3);
@@ -397,7 +301,7 @@ public class SegmentProcessorFrameworkTest {
     assertEquals(timeMetadata.getMaxValue(), 1597777200000L);
     assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597777200000_0");
     // segment 1
-    segmentMetadata = new SegmentMetadataImpl(files[1]);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
     assertEquals(segmentMetadata.getTotalDocs(), 5);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
     assertEquals(timeMetadata.getCardinality(), 5);
@@ -405,14 +309,15 @@ public class SegmentProcessorFrameworkTest {
     assertEquals(timeMetadata.getMaxValue(), 1597878000000L);
     assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597878000000_1");
     // segment 2
-    segmentMetadata = new SegmentMetadataImpl(files[2]);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
     assertEquals(segmentMetadata.getTotalDocs(), 2);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
     assertEquals(timeMetadata.getCardinality(), 2);
     assertEquals(timeMetadata.getMinValue(), 1597881600000L);
     assertEquals(timeMetadata.getMaxValue(), 1597892400000L);
     assertEquals(segmentMetadata.getName(), "myTable_1597881600000_1597892400000_2");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Time filter, round, partition, rollup - null value enabled
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schema)
@@ -420,15 +325,12 @@ public class SegmentProcessorFrameworkTest {
             new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setTimeRange(1597708800000L, 1597881600000L)
                 .setRoundBucketMs(86400000).setPartitionBucketMs(86400000).build()).setMergeType(MergeType.ROLLUP)
         .build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 2);
-    Arrays.sort(files);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 2);
+    outputSegments.sort(null);
     // segment 0
-    segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+    segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     segmentMetadata = segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 2);
     campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -458,7 +360,7 @@ public class SegmentProcessorFrameworkTest {
     assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597708800000_0");
     segment.destroy();
     // segment 1
-    segment = ImmutableSegmentLoader.load(files[1], ReadMode.mmap);
+    segment = ImmutableSegmentLoader.load(outputSegments.get(1), ReadMode.mmap);
     segmentMetadata = segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 3);
     campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -487,105 +389,95 @@ public class SegmentProcessorFrameworkTest {
     assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
     assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_1");
     segment.destroy();
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Time round, dedup
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema)
         .setTimeHandlerConfig(new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setRoundBucketMs(86400000).build())
         .setMergeType(MergeType.DEDUP).build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    segmentMetadata = new SegmentMetadataImpl(files[0]);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 8);
     assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597881600000_0");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
 
     // Segment config
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setSegmentConfig(
         new SegmentConfig.Builder().setMaxNumRecordsPerSegment(4).setSegmentNamePrefix("myPrefix").build()).build();
-    framework = new SegmentProcessorFramework(_singleSegment, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 3);
-    Arrays.sort(files);
-    segmentMetadata = new SegmentMetadataImpl(files[0]);
+    framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 3);
+    outputSegments.sort(null);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 4);
     assertEquals(segmentMetadata.getName(), "myPrefix_1597719600000_1597795200000_0");
-    segmentMetadata = new SegmentMetadataImpl(files[1]);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
     assertEquals(segmentMetadata.getTotalDocs(), 4);
     assertEquals(segmentMetadata.getName(), "myPrefix_1597802400000_1597878000000_1");
-    segmentMetadata = new SegmentMetadataImpl(files[2]);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
     assertEquals(segmentMetadata.getTotalDocs(), 2);
     assertEquals(segmentMetadata.getName(), "myPrefix_1597881600000_1597892400000_2");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_singleSegment);
   }
 
   @Test
   public void testMultipleSegments()
       throws Exception {
-    File outputSegmentDir = new File(_baseDir, "multiple_segments_output");
-    FileUtils.forceMkdir(outputSegmentDir);
+    File workingDir = new File(TEMP_DIR, "multiple_segments_output");
+    FileUtils.forceMkdir(workingDir);
 
     // Default configs
     SegmentProcessorConfig config =
         new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
-    SegmentProcessorFramework framework = new SegmentProcessorFramework(_multipleSegments, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    File[] files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(files[0]);
+    SegmentProcessorFramework framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
+    List<File> outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 10);
     assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_multipleSegments);
 
     // Time round, partition, rollup
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).setTimeHandlerConfig(
         new TimeHandlerConfig.Builder(TimeHandler.Type.EPOCH).setRoundBucketMs(86400000).setPartitionBucketMs(86400000)
             .build()).setMergeType(MergeType.ROLLUP).build();
-    framework = new SegmentProcessorFramework(_multipleSegments, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 3);
-    Arrays.sort(files);
-    segmentMetadata = new SegmentMetadataImpl(files[0]);
+    framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 3);
+    outputSegments.sort(null);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 2);
     assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597708800000_0");
-    segmentMetadata = new SegmentMetadataImpl(files[1]);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(1));
     assertEquals(segmentMetadata.getTotalDocs(), 3);
     assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_1");
-    segmentMetadata = new SegmentMetadataImpl(files[2]);
+    segmentMetadata = new SegmentMetadataImpl(outputSegments.get(2));
     assertEquals(segmentMetadata.getTotalDocs(), 2);
     assertEquals(segmentMetadata.getName(), "myTable_1597881600000_1597881600000_2");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_multipleSegments);
   }
 
   @Test
   public void testMultiValue()
       throws Exception {
-    File outputSegmentDir = new File(_baseDir, "output_directory_multi_value");
-    FileUtils.forceMkdir(outputSegmentDir);
+    File workingDir = new File(TEMP_DIR, "output_directory_multi_value");
+    FileUtils.forceMkdir(workingDir);
 
     // Rollup - null value enabled
     SegmentProcessorConfig config =
         new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schemaMV)
             .setMergeType(MergeType.ROLLUP).build();
-    SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    File[] files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    ImmutableSegment segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+    SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
+    List<File> outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 2);
     ColumnMetadata campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -614,18 +506,16 @@ public class SegmentProcessorFrameworkTest {
     assertTrue(timeNullValueVector.getNullBitmap().isEmpty());
     assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_0");
     segment.destroy();
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_multiValueSegments);
 
     // Dedup
     config = new SegmentProcessorConfig.Builder().setTableConfig(_tableConfigNullValueEnabled).setSchema(_schemaMV)
         .setMergeType(MergeType.DEDUP).build();
-    framework = new SegmentProcessorFramework(_multiValueSegments, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    segment = ImmutableSegmentLoader.load(files[0], ReadMode.mmap);
+    framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
+    outputSegments = framework.process();
+    assertEquals(outputSegments.size(), 1);
+    segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 2);
     campaignMetadata = segmentMetadata.getColumnMetadataFor("campaign");
@@ -642,32 +532,22 @@ public class SegmentProcessorFrameworkTest {
     assertEquals(timeMetadata.getMaxValue(), 1597795200000L);
     assertEquals(segmentMetadata.getName(), "myTable_1597795200000_1597795200000_0");
     segment.destroy();
-    FileUtils.cleanDirectory(outputSegmentDir);
-  }
-
-  @Test
-  public void testTarredSegments()
-      throws Exception {
-    File outputSegmentDir = new File(_baseDir, "output_directory_tarred");
-    FileUtils.forceMkdir(outputSegmentDir);
-
-    // Default configs
-    SegmentProcessorConfig config =
-        new SegmentProcessorConfig.Builder().setTableConfig(_tableConfig).setSchema(_schema).build();
-    SegmentProcessorFramework framework = new SegmentProcessorFramework(_tarredSegments, config, outputSegmentDir);
-    framework.processSegments();
-    framework.cleanup();
-    File[] files = outputSegmentDir.listFiles();
-    assertNotNull(files);
-    assertEquals(files.length, 1);
-    SegmentMetadata segmentMetadata = new SegmentMetadataImpl(files[0]);
-    assertEquals(segmentMetadata.getTotalDocs(), 10);
-    assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
-    FileUtils.cleanDirectory(outputSegmentDir);
+    FileUtils.cleanDirectory(workingDir);
+    rewindRecordReaders(_multiValueSegments);
   }
 
   @AfterClass
-  public void tearDown() {
-    FileUtils.deleteQuietly(_baseDir);
+  public void tearDown()
+      throws IOException {
+    for (RecordReader recordReader : _singleSegment) {
+      recordReader.close();
+    }
+    for (RecordReader recordReader : _multipleSegments) {
+      recordReader.close();
+    }
+    for (RecordReader recordReader : _multiValueSegments) {
+      recordReader.close();
+    }
+    FileUtils.deleteQuietly(TEMP_DIR);
   }
 }
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 4382bd7..2105623 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -61,12 +61,12 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
    * Converts the segment based on the given {@link PinotTaskConfig}.
    *
    * @param pinotTaskConfig Task config
-   * @param originalIndexDir Index directory for the original segment
+   * @param segmentDirs Index directories for the original segments
    * @param workingDir Working directory for the converted segment
    * @return a list of segment conversion result
    * @throws Exception
    */
-  protected abstract List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDir,
+  protected abstract List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
       File workingDir)
       throws Exception;
 
@@ -106,7 +106,7 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     String crypterName = getTableConfig(tableNameWithType).getValidationConfig().getCrypterClassName();
 
     try {
-      List<File> inputSegmentFiles = new ArrayList<>();
+      List<File> inputSegmentDirs = new ArrayList<>();
       for (int i = 0; i < downloadURLs.length; i++) {
         // Download the segment file
         File tarredSegmentFile = new File(tempDataDir, "tarredSegmentFile_" + i);
@@ -116,13 +116,23 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         // Un-tar the segment file
         File segmentDir = new File(tempDataDir, "segmentDir_" + i);
         File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0);
-        inputSegmentFiles.add(indexDir);
+        inputSegmentDirs.add(indexDir);
+        if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
+          LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath());
+        }
       }
 
       // Convert the segments
       File workingDir = new File(tempDataDir, "workingDir");
       Preconditions.checkState(workingDir.mkdir());
-      List<SegmentConversionResult> segmentConversionResults = convert(pinotTaskConfig, inputSegmentFiles, workingDir);
+      List<SegmentConversionResult> segmentConversionResults = convert(pinotTaskConfig, inputSegmentDirs, workingDir);
+
+      // Delete the input segments
+      for (File inputSegmentDir : inputSegmentDirs) {
+        if (!FileUtils.deleteQuietly(inputSegmentDir)) {
+          LOGGER.warn("Failed to delete input segment: {}", inputSegmentDir.getAbsolutePath());
+        }
+      }
 
       // Create a directory for converted tarred segment files
       File convertedTarredSegmentDir = new File(tempDataDir, "convertedTarredSegmentDir");
@@ -132,11 +142,14 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       List<File> tarredSegmentFiles = new ArrayList<>(numOutputSegments);
       for (SegmentConversionResult segmentConversionResult : segmentConversionResults) {
         // Tar the converted segment
-        File convertedIndexDir = segmentConversionResult.getFile();
+        File convertedSegmentDir = segmentConversionResult.getFile();
         File convertedSegmentTarFile = new File(convertedTarredSegmentDir,
             segmentConversionResult.getSegmentName() + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-        TarGzCompressionUtils.createTarGzFile(convertedIndexDir, convertedSegmentTarFile);
+        TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedSegmentTarFile);
         tarredSegmentFiles.add(convertedSegmentTarFile);
+        if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
+          LOGGER.warn("Failed to delete converted segment: {}", convertedSegmentDir.getAbsolutePath());
+        }
       }
 
       // Check whether the task get cancelled before uploading the segment
@@ -153,8 +166,8 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             Arrays.stream(inputSegmentNames.split(",")).map(String::trim).collect(Collectors.toList());
         List<String> segmentsTo =
             segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName).collect(Collectors.toList());
-        lineageEntryId = SegmentConversionUtils
-            .startSegmentReplace(tableNameWithType, uploadURL, new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
+        lineageEntryId = SegmentConversionUtils.startSegmentReplace(tableNameWithType, uploadURL,
+            new StartReplaceSegmentsRequest(segmentsFrom, segmentsTo));
       }
 
       // Upload the tarred segments
@@ -184,6 +197,9 @@ public abstract class BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         SegmentConversionUtils
             .uploadSegment(configs, FileUploadDownloadClient.makeAuthHeader(authToken), parameters, tableNameWithType,
                 resultSegmentName, uploadURL, convertedTarredSegmentFile);
+        if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+          LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
+        }
       }
 
       // Update the segment lineage to indicate that the segment replacement is done.
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
index eae3e6e..381ed58 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseSingleSegmentConversionExecutor.java
@@ -94,6 +94,9 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
       // Un-tar the segment file
       File segmentDir = new File(tempDataDir, "segmentDir");
       File indexDir = TarGzCompressionUtils.untar(tarredSegmentFile, segmentDir).get(0);
+      if (!FileUtils.deleteQuietly(tarredSegmentFile)) {
+        LOGGER.warn("Failed to delete tarred input segment: {}", tarredSegmentFile.getAbsolutePath());
+      }
 
       // Convert the segment
       File workingDir = new File(tempDataDir, "workingDir");
@@ -103,9 +106,19 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
           "Converted segment name: %s does not match original segment name: %s",
           segmentConversionResult.getSegmentName(), segmentName);
 
+      // Delete the input segment
+      if (!FileUtils.deleteQuietly(indexDir)) {
+        LOGGER.warn("Failed to delete input segment: {}", indexDir.getAbsolutePath());
+      }
+
       // Tar the converted segment
-      File convertedSegmentTarFile = new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
-      TarGzCompressionUtils.createTarGzFile(segmentConversionResult.getFile(), convertedSegmentTarFile);
+      File convertedSegmentDir = segmentConversionResult.getFile();
+      File convertedTarredSegmentFile =
+          new File(tempDataDir, segmentName + TarGzCompressionUtils.TAR_GZ_FILE_EXTENSION);
+      TarGzCompressionUtils.createTarGzFile(convertedSegmentDir, convertedTarredSegmentFile);
+      if (!FileUtils.deleteQuietly(convertedSegmentDir)) {
+        LOGGER.warn("Failed to delete converted segment: {}", convertedSegmentDir.getAbsolutePath());
+      }
 
       // Check whether the task get cancelled before uploading the segment
       if (_cancelled) {
@@ -141,7 +154,10 @@ public abstract class BaseSingleSegmentConversionExecutor extends BaseTaskExecut
 
       // Upload the tarred segment
       SegmentConversionUtils.uploadSegment(configs, httpHeaders, parameters, tableNameWithType, segmentName, uploadURL,
-          convertedSegmentTarFile);
+          convertedTarredSegmentFile);
+      if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
+        LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath());
+      }
 
       LOGGER.info("Done executing {} on table: {}, segment: {}", taskType, tableNameWithType, segmentName);
       return segmentConversionResult;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
index c1e7587..86d6af4 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/merge_rollup/MergeRollupTaskExecutor.java
@@ -18,13 +18,11 @@
  */
 package org.apache.pinot.plugin.minion.tasks.merge_rollup;
 
-import com.google.common.base.Preconditions;
 import java.io.File;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-import org.apache.commons.io.FileUtils;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.core.common.MinionConstants;
 import org.apache.pinot.core.common.MinionConstants.MergeRollupTask;
@@ -34,8 +32,10 @@ import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramew
 import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
 import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,11 +45,9 @@ import org.slf4j.LoggerFactory;
  */
 public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(MergeRollupTaskExecutor.class);
-  private static final String INPUT_SEGMENTS_DIR = "input_segments";
-  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   @Override
-  protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
+  protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
       File workingDir)
       throws Exception {
     String taskType = pinotTaskConfig.getTaskType();
@@ -83,30 +81,28 @@ public class MergeRollupTaskExecutor extends BaseMultipleSegmentsConversionExecu
 
     SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
 
-    File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
-    Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
-        inputSegmentsDir.getAbsolutePath(), taskType);
-    for (File indexDir : originalIndexDirs) {
-      FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+    List<RecordReader> recordReaders = new ArrayList<>(segmentDirs.size());
+    for (File segmentDir : segmentDirs) {
+      PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+      // NOTE: Do not fill null field with default value to be consistent with other record readers
+      recordReader.init(segmentDir, null, null, true);
+      recordReaders.add(recordReader);
     }
-    File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
-    Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
-        outputSegmentsDir.getAbsolutePath(), taskType);
-
-    SegmentProcessorFramework segmentProcessorFramework =
-        new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+    List<File> outputSegmentDirs;
     try {
-      segmentProcessorFramework.processSegments();
+      outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
     } finally {
-      segmentProcessorFramework.cleanup();
+      for (RecordReader recordReader : recordReaders) {
+        recordReader.close();
+      }
     }
 
     long endMillis = System.currentTimeMillis();
     LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
     List<SegmentConversionResult> results = new ArrayList<>();
-    for (File file : outputSegmentsDir.listFiles()) {
-      String outputSegmentName = file.getName();
-      results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
+    for (File outputSegmentDir : outputSegmentDirs) {
+      String outputSegmentName = outputSegmentDir.getName();
+      results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
           .setTableNameWithType(tableNameWithType).build());
     }
     return results;
diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
index 2a24937..aefd917 100644
--- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
+++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/realtime_to_offline_segments/RealtimeToOfflineSegmentsTaskExecutor.java
@@ -24,7 +24,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
-import org.apache.commons.io.FileUtils;
 import org.apache.helix.ZNRecord;
 import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
 import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
@@ -38,8 +37,10 @@ import org.apache.pinot.minion.executor.MinionTaskZkMetadataManager;
 import org.apache.pinot.plugin.minion.tasks.BaseMultipleSegmentsConversionExecutor;
 import org.apache.pinot.plugin.minion.tasks.MergeTaskUtils;
 import org.apache.pinot.plugin.minion.tasks.SegmentConversionResult;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,8 +68,6 @@ import org.slf4j.LoggerFactory;
  */
 public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsConversionExecutor {
   private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);
-  private static final String INPUT_SEGMENTS_DIR = "input_segments";
-  private static final String OUTPUT_SEGMENTS_DIR = "output_segments";
 
   private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
   private int _expectedVersion = Integer.MIN_VALUE;
@@ -105,7 +104,7 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
   }
 
   @Override
-  protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> originalIndexDirs,
+  protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, List<File> segmentDirs,
       File workingDir)
       throws Exception {
     String taskType = pinotTaskConfig.getTaskType();
@@ -149,30 +148,28 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
 
     SegmentProcessorConfig segmentProcessorConfig = segmentProcessorConfigBuilder.build();
 
-    File inputSegmentsDir = new File(workingDir, INPUT_SEGMENTS_DIR);
-    Preconditions.checkState(inputSegmentsDir.mkdirs(), "Failed to create input directory: %s for task: %s",
-        inputSegmentsDir.getAbsolutePath(), taskType);
-    for (File indexDir : originalIndexDirs) {
-      FileUtils.copyDirectoryToDirectory(indexDir, inputSegmentsDir);
+    List<RecordReader> recordReaders = new ArrayList<>(segmentDirs.size());
+    for (File segmentDir : segmentDirs) {
+      PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+      // NOTE: Do not fill null field with default value to be consistent with other record readers
+      recordReader.init(segmentDir, null, null, true);
+      recordReaders.add(recordReader);
     }
-    File outputSegmentsDir = new File(workingDir, OUTPUT_SEGMENTS_DIR);
-    Preconditions.checkState(outputSegmentsDir.mkdirs(), "Failed to create output directory: %s for task: %s",
-        outputSegmentsDir.getAbsolutePath(), taskType);
-
-    SegmentProcessorFramework segmentProcessorFramework =
-        new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+    List<File> outputSegmentDirs;
     try {
-      segmentProcessorFramework.processSegments();
+      outputSegmentDirs = new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir).process();
     } finally {
-      segmentProcessorFramework.cleanup();
+      for (RecordReader recordReader : recordReaders) {
+        recordReader.close();
+      }
     }
 
     long endMillis = System.currentTimeMillis();
     LOGGER.info("Finished task: {} with configs: {}. Total time: {}ms", taskType, configs, (endMillis - startMillis));
     List<SegmentConversionResult> results = new ArrayList<>();
-    for (File file : outputSegmentsDir.listFiles()) {
-      String outputSegmentName = file.getName();
-      results.add(new SegmentConversionResult.Builder().setFile(file).setSegmentName(outputSegmentName)
+    for (File outputSegmentDir : outputSegmentDirs) {
+      String outputSegmentName = outputSegmentDir.getName();
+      results.add(new SegmentConversionResult.Builder().setFile(outputSegmentDir).setSegmentName(outputSegmentName)
           .setTableNameWithType(offlineTableName).build());
     }
     return results;
diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
index 75af4a1..81ef424 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/SegmentProcessorFrameworkCommand.java
@@ -18,11 +18,19 @@
  */
 package org.apache.pinot.tools.admin.command;
 
+import com.google.common.base.Preconditions;
 import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.TarGzCompressionUtils;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig;
 import org.apache.pinot.core.segment.processing.framework.SegmentProcessorFramework;
+import org.apache.pinot.segment.local.segment.readers.PinotSegmentRecordReader;
 import org.apache.pinot.spi.config.table.TableConfig;
 import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.data.readers.RecordReader;
 import org.apache.pinot.spi.plugin.PluginManager;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.tools.Command;
@@ -72,25 +80,42 @@ public class SegmentProcessorFrameworkCommand extends AbstractBaseAdminCommand i
   @Override
   public boolean execute()
       throws Exception {
+    PluginManager.get().init();
 
     SegmentProcessorFrameworkSpec segmentProcessorFrameworkSpec =
         JsonUtils.fileToObject(new File(_segmentProcessorFrameworkSpec), SegmentProcessorFrameworkSpec.class);
 
     File inputSegmentsDir = new File(segmentProcessorFrameworkSpec.getInputSegmentsDir());
     File outputSegmentsDir = new File(segmentProcessorFrameworkSpec.getOutputSegmentsDir());
-    if (!outputSegmentsDir.exists()) {
-      if (!outputSegmentsDir.mkdirs()) {
-        throw new RuntimeException(
-            "Did not find output directory, and could not create it either: " + segmentProcessorFrameworkSpec
-                .getOutputSegmentsDir());
+    File workingDir = new File(outputSegmentsDir, "tmp-" + UUID.randomUUID());
+    File untarredSegmentsDir = new File(workingDir, "untarred_segments");
+    FileUtils.forceMkdir(untarredSegmentsDir);
+    File[] segmentDirs = inputSegmentsDir.listFiles();
+    Preconditions
+        .checkState(segmentDirs != null && segmentDirs.length > 0, "Failed to find files under input segments dir: %s",
+            inputSegmentsDir.getAbsolutePath());
+    List<RecordReader> recordReaders = new ArrayList<>(segmentDirs.length);
+    for (File segmentDir : segmentDirs) {
+      String fileName = segmentDir.getName();
+
+      // Untar the segments if needed
+      if (!segmentDir.isDirectory()) {
+        if (fileName.endsWith(".tar.gz") || fileName.endsWith(".tgz")) {
+          segmentDir = TarGzCompressionUtils.untar(segmentDir, untarredSegmentsDir).get(0);
+        } else {
+          throw new IllegalStateException("Unsupported segment format: " + segmentDir.getAbsolutePath());
+        }
       }
-    }
 
-    PluginManager.get().init();
+      PinotSegmentRecordReader recordReader = new PinotSegmentRecordReader();
+      // NOTE: Do not fill null field with default value to be consistent with other record readers
+      recordReader.init(segmentDir, null, null, true);
+      recordReaders.add(recordReader);
+    }
 
-    Schema schema = Schema.fromFile(new File(segmentProcessorFrameworkSpec.getSchemaFile()));
     TableConfig tableConfig =
         JsonUtils.fileToObject(new File(segmentProcessorFrameworkSpec.getTableConfigFile()), TableConfig.class);
+    Schema schema = Schema.fromFile(new File(segmentProcessorFrameworkSpec.getSchemaFile()));
     SegmentProcessorConfig segmentProcessorConfig =
         new SegmentProcessorConfig.Builder().setTableConfig(tableConfig).setSchema(schema)
             .setTimeHandlerConfig(segmentProcessorFrameworkSpec.getTimeHandlerConfig())
@@ -100,16 +125,22 @@ public class SegmentProcessorFrameworkCommand extends AbstractBaseAdminCommand i
             .setSegmentConfig(segmentProcessorFrameworkSpec.getSegmentConfig()).build();
 
     SegmentProcessorFramework framework =
-        new SegmentProcessorFramework(inputSegmentsDir, segmentProcessorConfig, outputSegmentsDir);
+        new SegmentProcessorFramework(recordReaders, segmentProcessorConfig, workingDir);
     try {
       LOGGER.info("Starting processing segments via SegmentProcessingFramework");
-      framework.processSegments();
+      List<File> outputSegmentDirs = framework.process();
+      for (File outputSegmentDir : outputSegmentDirs) {
+        FileUtils.moveDirectory(outputSegmentDir, new File(outputSegmentsDir, outputSegmentDir.getName()));
+      }
       LOGGER.info("Finished processing segments via SegmentProcessingFramework");
     } catch (Exception e) {
       LOGGER.error("Caught exception when running SegmentProcessingFramework. Exiting", e);
       return false;
     } finally {
-      framework.cleanup();
+      for (RecordReader recordReader : recordReaders) {
+        recordReader.close();
+      }
+      FileUtils.deleteQuietly(workingDir);
     }
     return true;
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org