You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by ja...@apache.org on 2023/06/07 20:04:24 UTC
[pinot] branch master updated: clean up output files upon exceptions more properly for SegmentProcessorFramework (#10847)
This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 4b9af9d83b clean up output files upon exceptions more properly for SegmentProcessorFramework (#10847)
4b9af9d83b is described below
commit 4b9af9d83bf66a27fec99e458b2d898d0c002599
Author: Xiaobing <61...@users.noreply.github.com>
AuthorDate: Wed Jun 7 13:04:18 2023 -0700
clean up output files upon exceptions more properly for SegmentProcessorFramework (#10847)
---
.../framework/SegmentProcessorFramework.java | 50 +++++++++++++++-------
.../segment/processing/mapper/SegmentMapper.java | 13 ++++++
.../segment/processing/reducer/DedupReducer.java | 22 ++++++++--
.../segment/processing/reducer/RollupReducer.java | 23 +++++++---
.../framework/SegmentProcessorFrameworkTest.java | 30 +++++++++++++
5 files changed, 114 insertions(+), 24 deletions(-)
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
index 25f2b82eba..d64f35f515 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFramework.java
@@ -63,6 +63,7 @@ public class SegmentProcessorFramework {
private final File _mapperOutputDir;
private final File _reducerOutputDir;
private final File _segmentsOutputDir;
+ private Map<String, GenericRowFileManager> _partitionToFileManagerMap;
/**
* Initializes the SegmentProcessorFramework with record readers, config and working directory.
@@ -91,33 +92,55 @@ public class SegmentProcessorFramework {
*/
public List<File> process()
throws Exception {
+ try {
+ return doProcess();
+ } catch (Exception e) {
+ // Cleaning up file managers no matter they are from map phase or reduce phase. For those from reduce phase, the
+ // reducers should have cleaned up the corresponding file managers from map phase already.
+ if (_partitionToFileManagerMap != null) {
+ for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) {
+ fileManager.cleanUp();
+ }
+ }
+ // Cleaning up output dir as processing has failed.
+ FileUtils.deleteQuietly(_segmentsOutputDir);
+ throw e;
+ } finally {
+ FileUtils.deleteDirectory(_mapperOutputDir);
+ FileUtils.deleteDirectory(_reducerOutputDir);
+ }
+ }
+
+ private List<File> doProcess()
+ throws Exception {
// Map phase
LOGGER.info("Beginning map phase on {} record readers", _recordReaders.size());
SegmentMapper mapper = new SegmentMapper(_recordReaders, _segmentProcessorConfig, _mapperOutputDir);
- Map<String, GenericRowFileManager> partitionToFileManagerMap = mapper.map();
+ _partitionToFileManagerMap = mapper.map();
// Check for mapper output files
- if (partitionToFileManagerMap.isEmpty()) {
+ if (_partitionToFileManagerMap.isEmpty()) {
LOGGER.info("No partition generated from mapper phase, skipping the reducer phase");
return Collections.emptyList();
}
// Reduce phase
- LOGGER.info("Beginning reduce phase on partitions: {}", partitionToFileManagerMap.keySet());
+ LOGGER.info("Beginning reduce phase on partitions: {}", _partitionToFileManagerMap.keySet());
Consumer<Object> observer = _segmentProcessorConfig.getProgressObserver();
- int totalCount = partitionToFileManagerMap.keySet().size();
+ int totalCount = _partitionToFileManagerMap.keySet().size();
int count = 1;
- for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
+ for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
- observer.accept(String
- .format("Doing reduce phase on data from partition: %s (%d out of %d)", partitionId, count++, totalCount));
+ observer.accept(
+ String.format("Doing reduce phase on data from partition: %s (%d out of %d)", partitionId, count++,
+ totalCount));
GenericRowFileManager fileManager = entry.getValue();
Reducer reducer = ReducerFactory.getReducer(partitionId, fileManager, _segmentProcessorConfig, _reducerOutputDir);
entry.setValue(reducer.reduce());
}
// Segment creation phase
- LOGGER.info("Beginning segment creation phase on partitions: {}", partitionToFileManagerMap.keySet());
+ LOGGER.info("Beginning segment creation phase on partitions: {}", _partitionToFileManagerMap.keySet());
List<File> outputSegmentDirs = new ArrayList<>();
TableConfig tableConfig = _segmentProcessorConfig.getTableConfig();
Schema schema = _segmentProcessorConfig.getSchema();
@@ -128,9 +151,9 @@ public class SegmentProcessorFramework {
generatorConfig.setOutDir(_segmentsOutputDir.getPath());
if (tableConfig.getIndexingConfig().getSegmentNameGeneratorType() != null) {
- generatorConfig.setSegmentNameGenerator(SegmentNameGeneratorFactory
- .createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix, segmentNamePostfix, fixedSegmentName,
- false));
+ generatorConfig.setSegmentNameGenerator(
+ SegmentNameGeneratorFactory.createSegmentNameGenerator(tableConfig, schema, segmentNamePrefix,
+ segmentNamePostfix, fixedSegmentName, false));
} else {
// SegmentNameGenerator will be inferred by the SegmentGeneratorConfig.
generatorConfig.setSegmentNamePrefix(segmentNamePrefix);
@@ -139,7 +162,7 @@ public class SegmentProcessorFramework {
int maxNumRecordsPerSegment = _segmentProcessorConfig.getSegmentConfig().getMaxNumRecordsPerSegment();
int sequenceId = 0;
- for (Map.Entry<String, GenericRowFileManager> entry : partitionToFileManagerMap.entrySet()) {
+ for (Map.Entry<String, GenericRowFileManager> entry : _partitionToFileManagerMap.entrySet()) {
String partitionId = entry.getKey();
GenericRowFileManager fileManager = entry.getValue();
try {
@@ -168,9 +191,6 @@ public class SegmentProcessorFramework {
fileManager.cleanUp();
}
}
- FileUtils.deleteDirectory(_mapperOutputDir);
- FileUtils.deleteDirectory(_reducerOutputDir);
-
LOGGER.info("Successfully created segments: {}", outputSegmentDirs);
return outputSegmentDirs;
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
index 2d8118ff82..302aa2869f 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java
@@ -108,6 +108,19 @@ public class SegmentMapper {
*/
public Map<String, GenericRowFileManager> map()
throws Exception {
+ try {
+ return doMap();
+ } catch (Exception e) {
+ // Cleaning up resources created by the mapper, leaving others to the caller like the input _recordReaders.
+ for (GenericRowFileManager fileManager : _partitionToFileManagerMap.values()) {
+ fileManager.cleanUp();
+ }
+ throw e;
+ }
+ }
+
+ private Map<String, GenericRowFileManager> doMap()
+ throws Exception {
Consumer<Object> observer = _processorConfig.getProgressObserver();
int totalCount = _recordReaders.size();
int count = 1;
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
index 33bdd8b544..a54f00d5eb 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/DedupReducer.java
@@ -38,6 +38,7 @@ public class DedupReducer implements Reducer {
private final String _partitionId;
private final GenericRowFileManager _fileManager;
private final File _reducerOutputDir;
+ private GenericRowFileManager _dedupFileManager;
public DedupReducer(String partitionId, GenericRowFileManager fileManager, File reducerOutputDir) {
_partitionId = partitionId;
@@ -48,6 +49,19 @@ public class DedupReducer implements Reducer {
@Override
public GenericRowFileManager reduce()
throws Exception {
+ try {
+ return doReduce();
+ } catch (Exception e) {
+ // Cleaning up resources created by the reducer, leaving others to the caller like the input _fileManager.
+ if (_dedupFileManager != null) {
+ _dedupFileManager.cleanUp();
+ }
+ throw e;
+ }
+ }
+
+ private GenericRowFileManager doReduce()
+ throws Exception {
LOGGER.info("Start reducing on partition: {}", _partitionId);
long reduceStartTimeMs = System.currentTimeMillis();
@@ -63,10 +77,10 @@ public class DedupReducer implements Reducer {
FileUtils.forceMkdir(partitionOutputDir);
LOGGER.info("Start creating dedup file under dir: {}", partitionOutputDir);
long dedupFileCreationStartTimeMs = System.currentTimeMillis();
- GenericRowFileManager dedupFileManager =
+ _dedupFileManager =
new GenericRowFileManager(partitionOutputDir, _fileManager.getFieldSpecs(), _fileManager.isIncludeNullFields(),
0);
- GenericRowFileWriter dedupFileWriter = dedupFileManager.getFileWriter();
+ GenericRowFileWriter dedupFileWriter = _dedupFileManager.getFileWriter();
GenericRow previousRow = new GenericRow();
recordReader.read(0, previousRow);
int previousRowId = 0;
@@ -79,11 +93,11 @@ public class DedupReducer implements Reducer {
dedupFileWriter.write(previousRow);
}
}
- dedupFileManager.closeFileWriter();
+ _dedupFileManager.closeFileWriter();
LOGGER.info("Finish creating dedup file in {}ms", System.currentTimeMillis() - dedupFileCreationStartTimeMs);
_fileManager.cleanUp();
LOGGER.info("Finish reducing in {}ms", System.currentTimeMillis() - reduceStartTimeMs);
- return dedupFileManager;
+ return _dedupFileManager;
}
}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
index 5b11c46def..ae88120f20 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/reducer/RollupReducer.java
@@ -48,6 +48,7 @@ public class RollupReducer implements Reducer {
private final GenericRowFileManager _fileManager;
private final Map<String, AggregationFunctionType> _aggregationTypes;
private final File _reducerOutputDir;
+ private GenericRowFileManager _rollupFileManager;
public RollupReducer(String partitionId, GenericRowFileManager fileManager,
Map<String, AggregationFunctionType> aggregationTypes, File reducerOutputDir) {
@@ -60,6 +61,19 @@ public class RollupReducer implements Reducer {
@Override
public GenericRowFileManager reduce()
throws Exception {
+ try {
+ return doReduce();
+ } catch (Exception e) {
+ // Cleaning up resources created by the reducer, leaving others to the caller like the input _fileManager.
+ if (_rollupFileManager != null) {
+ _rollupFileManager.cleanUp();
+ }
+ throw e;
+ }
+ }
+
+ private GenericRowFileManager doReduce()
+ throws Exception {
LOGGER.info("Start reducing on partition: {}", _partitionId);
long reduceStartTimeMs = System.currentTimeMillis();
@@ -85,9 +99,8 @@ public class RollupReducer implements Reducer {
FileUtils.forceMkdir(partitionOutputDir);
LOGGER.info("Start creating rollup file under dir: {}", partitionOutputDir);
long rollupFileCreationStartTimeMs = System.currentTimeMillis();
- GenericRowFileManager rollupFileManager =
- new GenericRowFileManager(partitionOutputDir, fieldSpecs, includeNullFields, 0);
- GenericRowFileWriter rollupFileWriter = rollupFileManager.getFileWriter();
+ _rollupFileManager = new GenericRowFileManager(partitionOutputDir, fieldSpecs, includeNullFields, 0);
+ GenericRowFileWriter rollupFileWriter = _rollupFileManager.getFileWriter();
GenericRow previousRow = new GenericRow();
recordReader.read(0, previousRow);
int previousRowId = 0;
@@ -122,12 +135,12 @@ public class RollupReducer implements Reducer {
}
}
rollupFileWriter.write(previousRow);
- rollupFileManager.closeFileWriter();
+ _rollupFileManager.closeFileWriter();
LOGGER.info("Finish creating rollup file in {}ms", System.currentTimeMillis() - rollupFileCreationStartTimeMs);
_fileManager.cleanUp();
LOGGER.info("Finish reducing in {}ms", System.currentTimeMillis() - reduceStartTimeMs);
- return rollupFileManager;
+ return _rollupFileManager;
}
private static void aggregateWithNullFields(GenericRow aggregatedRow, GenericRow rowToAggregate,
diff --git a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
index 0b73d1f18c..c5868063e7 100644
--- a/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
+++ b/pinot-core/src/test/java/org/apache/pinot/core/segment/processing/framework/SegmentProcessorFrameworkTest.java
@@ -219,6 +219,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ String[] outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 10);
@@ -255,6 +257,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
segmentMetadata = segment.getSegmentMetadata();
assertEquals(segmentMetadata.getName(), "myTable_segment_0001");
@@ -269,6 +273,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 5);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
@@ -286,6 +292,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 5);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
@@ -303,6 +311,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertTrue(outputSegments.isEmpty());
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
FileUtils.cleanDirectory(workingDir);
rewindRecordReaders(_singleSegment);
@@ -313,6 +323,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 10);
timeMetadata = segmentMetadata.getColumnMetadataFor("time");
@@ -329,6 +341,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 3);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
outputSegments.sort(null);
// segment 0
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
@@ -366,6 +380,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 2);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
outputSegments.sort(null);
// segment 0
segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
@@ -429,6 +445,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 8);
assertEquals(segmentMetadata.getName(), "myTable_1597708800000_1597881600000_0");
@@ -442,6 +460,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 3);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
outputSegments.sort(null);
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 4);
@@ -462,6 +482,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_singleSegment, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 3);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
outputSegments.sort(null);
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 4);
@@ -488,6 +510,8 @@ public class SegmentProcessorFrameworkTest {
SegmentProcessorFramework framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
List<File> outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ String[] outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
SegmentMetadata segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 10);
assertEquals(segmentMetadata.getName(), "myTable_1597719600000_1597892400000_0");
@@ -501,6 +525,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_multipleSegments, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 3);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
outputSegments.sort(null);
segmentMetadata = new SegmentMetadataImpl(outputSegments.get(0));
assertEquals(segmentMetadata.getTotalDocs(), 2);
@@ -528,6 +554,8 @@ public class SegmentProcessorFrameworkTest {
SegmentProcessorFramework framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
List<File> outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ String[] outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
ImmutableSegment segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
SegmentMetadataImpl segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 2);
@@ -562,6 +590,8 @@ public class SegmentProcessorFrameworkTest {
framework = new SegmentProcessorFramework(_multiValueSegments, config, workingDir);
outputSegments = framework.process();
assertEquals(outputSegments.size(), 1);
+ outputDirs = workingDir.list();
+ assertTrue(outputDirs != null && outputDirs.length == 1, Arrays.toString(outputDirs));
segment = ImmutableSegmentLoader.load(outputSegments.get(0), ReadMode.mmap);
segmentMetadata = (SegmentMetadataImpl) segment.getSegmentMetadata();
assertEquals(segmentMetadata.getTotalDocs(), 2);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org