You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/07 20:04:24 UTC

[pinot] branch master updated: clean up output files upon exceptions more properly for SegmentProcessorFramework (#10847)

This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b9af9d83b clean up output files upon exceptions more properly for SegmentProcessorFramework (#10847)
4b9af9d83b is described below

commit 4b9af9d83bf66a27fec99e458b2d898d0c002599
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Jun 7 13:04:18 2023 -0700

    clean up output files upon exceptions more properly for SegmentProcessorFramework (#10847)
---
 .../framework/SegmentProcessorFramework.java       | 50 +++++++++++++++-------
 .../segment/processing/mapper/SegmentMapper.java   | 13 ++++++
 .../segment/processing/reducer/DedupReducer.java   | 22 ++++++++--
 .../segment/processing/reducer/RollupReducer.java  | 23 +++++++---
 .../framework/SegmentProcessorFrameworkTest.java   | 30 +++++++++++++
 5 files changed, 114 insertions(+), 24 deletions(-)

diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 25f2b82eba..d64f35f515 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -63,6 +63,7 @@ public class SegmentProcessorFramework {
   private final File _mapperOutputDir;
   private final File _reducerOutputDir;
   private final File _segmentsOutputDir;
+  private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
 
   /**
    * Initializes the SegmentProcessorFramework with record readers, config and working directory.
@@ -91,33 +92,55 @@ public class SegmentProcessorFramework {
    */
   public List<File> process()
       throws Exception {
+    try {
+      return doProcess();
+    } catch (Exception e) {
+      // Cleaning up file managers no matter they are from map phase or reduce phase. For those from reduce phase, the
+      // reducers should have cleaned up the corresponding file managers from map phase already.
+      if (_partitionToFileManagerMap != null) {
+        for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) {
+          fileManager.cleanUp();
+        }
+      }
+      // Cleaning up output dir as processing has failed.
+      FileUtils.deleteQuietly(_segmentsOutputDir);
+      throw e;
+    } finally {
+      FileUtils.deleteDirectory(_mapperOutputDir);
+      FileUtils.deleteDirectory(_reducerOutputDir);
+    }
+  }
+
+  private List<File> doProcess()
+      throws Exception {
     // Map phase
     LOGGER.info("Beginning map phase on {} record readers", _recordReaders.size());
     SegmentMapper mapper = new SegmentMapper(_recordReaders, _segmentProcessorConfig, _mapperOutputDir);
-    Map<String, GenericRowFileManager> partitionToFileManagerMap = mapper.map();
+    _partitionToFileManagerMap = mapper.map();
 
     // Check for mapper output files
-    if (partitionToFileManagerMap.isEmpty()) {
+    if (_partitionToFileManagerMap.isEmpty()) {
       LOGGER.info("No partition generated from mapper phase, skipping the reducer phase");
       return Collections.emptyList();
     }
 
     // Reduce phase
-    LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet());
+    LOGGER.info("Beginning reduce phase on partitions: {}", _partitionToFileManagerMap.keySet());
     Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
-    int totalCount = partitionToFileManagerMap.keySet().size();
+    int totalCount = _partitionToFileManagerMap.keySet().size();
     int count = 1;
-    for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
+    for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) {
       String partitionId = entry.getKey();
-      observer.accept(String
-          .format("Doing reduce phase on data from partition: %s (%d out of %d)", partitionId, count++, totalCount));
+      observer.accept(
+          String.format("Doing reduce phase on data from partition: %s (%d out of %d)", partitionId, count++,
+              totalCount));
       GenericRowFileManager fileManager = entry.getValue();
       Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager, _segmentProcessorConfig, _reducerOutputDir);
       entry.setValue(reducer.reduce());
     }
 
     // Segment creation phase
-    LOGGER.info("Beginning segment creation phase on partitions: {}", partitionToFileManagerMap.keySet());
+    LOGGER.info("Beginning segment creation phase on partitions: {}", _partitionToFileManagerMap.keySet());
     List<File> outputSegmentDirs = new ArrayList<>();
     TableConfig tableConfig = _segmentProcessorConfig.getTableConfig();
     Schema schema = _segmentProcessorConfig.getSchema();
@@ -128,9 +151,9 @@ public class SegmentProcessorFramework {
     generatorConfig.setOutDir(_segmentsOutputDir.getPath());
 
     if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
-      generatorConfig.setSegmentNameGenerator(SegmentNameGeneratorFactory
-          .createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, fixedSegmentName,
-              false));
+      generatorConfig.setSegmentNameGenerator(
+          SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix,
+              segmentNamePostfix, fixedSegmentName, false));
     } else {
       // SegmentNameGenerator will be inferred by the SegmentGeneratorConfig.
       generatorConfig.setSegmentNamePrefix(segmentNamePrefix);
@@ -139,7 +162,7 @@ public class SegmentProcessorFramework {
 
     int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
     int sequenceId = 0;
-    for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
+    for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) {
       String partitionId = entry.getKey();
       GenericRowFileManager fileManager = entry.getValue();
       try {
@@ -168,9 +191,6 @@ public class SegmentProcessorFramework {
         fileManager.cleanUp();
       }
     }
-    FileUtils.deleteDirectory(_mapperOutputDir);
-    FileUtils.deleteDirectory(_reducerOutputDir);
-
     LOGGER.info("Successfully created segments: {}", outputSegmentDirs);
     return outputSegmentDirs;
   }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 2d8118ff82..302aa2869f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -108,6 +108,19 @@ public class SegmentMapper {
    */
   public Map<String, GenericRowFileManager> map()
       throws Exception {
+    try {
+      return doMap();
+    } catch (Exception e) {
+      // Cleaning up resources created by the mapper, leaving others to the caller like the input _recordReaders.
+      for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) {
+        fileManager.cleanUp();
+      }
+      throw e;
+    }
+  }
+
+  private Map<String, GenericRowFileManager> doMap()
+      throws Exception {
     Consumer<Object> observer = _processorConfig.getProgressObserver();
     int totalCount = _recordReaders.size();
     int count = 1;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
index 33bdd8b544..a54f00d5eb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
@@ -38,6 +38,7 @@ public class DedupReducer implements Reducer {
   private final String _partitionId;
   private final GenericRowFileManager _fileManager;
   private final File _reducerOutputDir;
+  private GenericRowFileManager _dedupFileManager;
 
   public DedupReducer(String partitionId, GenericRowFileManager fileManager, File reducerOutputDir) {
     _partitionId = partitionId;
@@ -48,6 +49,19 @@ public class DedupReducer implements Reducer {
   @Override
   public GenericRowFileManager reduce()
       throws Exception {
+    try {
+      return doReduce();
+    } catch (Exception e) {
+      // Cleaning up resources created by the reducer, leaving others to the caller like the input _fileManager.
+      if (_dedupFileManager != null) {
+        _dedupFileManager.cleanUp();
+      }
+      throw e;
+    }
+  }
+
+  private GenericRowFileManager doReduce()
+      throws Exception {
     LOGGER.info("Start reducing on partition: {}", _partitionId);
     long reduceStartTimeMs = System.currentTimeMillis();
 
@@ -63,10 +77,10 @@ public class DedupReducer implements Reducer {
     FileUtils.forceMkdir(partitionOutputDir);
     LOGGER.info("Start creating dedup file under dir: {}", partitionOutputDir);
     long dedupFileCreationStartTimeMs = System.currentTimeMillis();
-    GenericRowFileManager dedupFileManager =
+    _dedupFileManager =
         new GenericRowFileManager(partitionOutputDir, _fileManager.getFieldSpecs(), _fileManager.isIncludeNullFields(),
             0);
-    GenericRowFileWriter dedupFileWriter = dedupFileManager.getFileWriter();
+    GenericRowFileWriter dedupFileWriter = _dedupFileManager.getFileWriter();
     GenericRow previousRow = new GenericRow();
     recordReader.read(0, previousRow);
     int previousRowId = 0;
@@ -79,11 +93,11 @@ public class DedupReducer implements Reducer {
         dedupFileWriter.write(previousRow);
       }
     }
-    dedupFileManager.closeFileWriter();
+    _dedupFileManager.closeFileWriter();
     LOGGER.info("Finish creating dedup file in {}ms", System.currentTimeMillis() - dedupFileCreationStartTimeMs);
 
     _fileManager.cleanUp();
     LOGGER.info("Finish reducing in {}ms", System.currentTimeMillis() - reduceStartTimeMs);
-    return dedupFileManager;
+    return _dedupFileManager;
   }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
index 5b11c46def..ae88120f20 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
@@ -48,6 +48,7 @@ public class RollupReducer implements Reducer {
   private final GenericRowFileManager _fileManager;
   private final Map<String, AggregationFunctionType> _aggregationTypes;
   private final File _reducerOutputDir;
+  private GenericRowFileManager _rollupFileManager;
 
   public RollupReducer(String partitionId, GenericRowFileManager fileManager,
       Map<String, AggregationFunctionType> aggregationTypes, File reducerOutputDir) {
@@ -60,6 +61,19 @@ public class RollupReducer implements Reducer {
   @Override
   public GenericRowFileManager reduce()
       throws Exception {
+    try {
+      return doReduce();
+    } catch (Exception e) {
+      // Cleaning up resources created by the reducer, leaving others to the caller like the input _fileManager.
+      if (_rollupFileManager != null) {
+        _rollupFileManager.cleanUp();
+      }
+      throw e;
+    }
+  }
+
+  private GenericRowFileManager doReduce()
+      throws Exception {
     LOGGER.info("Start reducing on partition: {}", _partitionId);
     long reduceStartTimeMs = System.currentTimeMillis();
 
@@ -85,9 +99,8 @@ public class RollupReducer implements Reducer {
     FileUtils.forceMkdir(partitionOutputDir);
     LOGGER.info("Start creating rollup file under dir: {}", partitionOutputDir);
     long rollupFileCreationStartTimeMs = System.currentTimeMillis();
-    GenericRowFileManager rollupFileManager =
-        new GenericRowFileManager(partitionOutputDir, fieldSpecs, includeNullFields, 0);
-    GenericRowFileWriter rollupFileWriter = rollupFileManager.getFileWriter();
+    _rollupFileManager = new GenericRowFileManager(partitionOutputDir, fieldSpecs, includeNullFields, 0);
+    GenericRowFileWriter rollupFileWriter = _rollupFileManager.getFileWriter();
     GenericRow previousRow = new GenericRow();
     recordReader.read(0, previousRow);
     int previousRowId = 0;
@@ -122,12 +135,12 @@ public class RollupReducer implements Reducer {
       }
     }
     rollupFileWriter.write(previousRow);
-    rollupFileManager.closeFileWriter();
+    _rollupFileManager.closeFileWriter();
     LOGGER.info("Finish creating rollup file in {}ms", System.currentTimeMillis() - rollupFileCreationStartTimeMs);
 
     _fileManager.cleanUp();
     LOGGER.info("Finish reducing in {}ms", System.currentTimeMillis() - reduceStartTimeMs);
-    return rollupFileManager;
+    return _rollupFileManager;
   }
 
   private static void aggregateWithNullFields(GenericRow aggregatedRow, GenericRow rowToAggregate,
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 0b73d1f18c..c5868063e7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -219,6 +219,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    String[] outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     segmentMetadata = segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 10);
@@ -255,6 +257,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     segmentMetadata = segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getName(), "myTable_segment_0001");
@@ -269,6 +273,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 5);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
@@ -286,6 +292,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 5);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
@@ -303,6 +311,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertTrue(outputSegments.isEmpty());
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     FileUtils.cleanDirectory(workingDir);
     rewindRecordReaders(_singleSegment);
 
@@ -313,6 +323,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 10);
     timeMetadata = segmentMetadata.getColumnMetadataFor("time");
@@ -329,6 +341,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 3);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     outputSegments.sort(null);
     // segment 0
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
@@ -366,6 +380,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 2);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     outputSegments.sort(null);
     // segment 0
     segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
@@ -429,6 +445,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 8);
     assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597881600000_0");
@@ -442,6 +460,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 3);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     outputSegments.sort(null);
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 4);
@@ -462,6 +482,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 3);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     outputSegments.sort(null);
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 4);
@@ -488,6 +510,8 @@ public class SegmentProcessorFrameworkTest {
     SegmentProcessorFramework framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
     List<File> outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    String[] outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     SegmentMetadata segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 10);
     assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
@@ -501,6 +525,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 3);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     outputSegments.sort(null);
     segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
     assertEquals(segmentMetadata.getTotalDocs(), 2);
@@ -528,6 +554,8 @@ public class SegmentProcessorFrameworkTest {
     SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
     List<File> outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    String[] outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 2);
@@ -562,6 +590,8 @@ public class SegmentProcessorFrameworkTest {
     framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
     outputSegments = framework.process();
     assertEquals(outputSegments.size(), 1);
+    outputDirs = workingDir.list();
+    assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
     segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
     segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
     assertEquals(segmentMetadata.getTotalDocs(), 2);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org